1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};

use crate::{MirRelationExpr, MirScalarExpr};
use repr::{Datum, Row};

/// A compound operator that can be applied row-by-row.
///
/// This operator integrates the map, filter, and project operators.
/// It applies a sequences of map expressions, which are allowed to
/// refer to previous expressions, interleaved with predicates which
/// must be satisfied for an output to be produced. If all predicates
/// evaluate to `Datum::True` the data at the identified columns are
/// collected and produced as output in a packed `Row`.
///
/// This operator is a "builder" and its contents may contain expressions
/// that are not yet executable. For example, it may contain temporal
/// expressions in `self.expressions`, even though this is not something
/// we can directly evaluate. The plan creation methods will defensively
/// ensure that the right thing happens.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct MapFilterProject {
    /// A sequence of expressions that should be appended to the row.
    ///
    /// Many of these expressions may not be produced in the output,
    /// and may only be present as common subexpressions.
    pub expressions: Vec<MirScalarExpr>,
    /// Expressions that must evaluate to `Datum::True` for the output
    /// row to be produced.
    ///
    /// Each entry is prepended with a column identifier indicating
    /// the column *before* which the predicate should first be applied.
    /// Most commonly this would be one plus the largest column identifier
    /// in the predicate's support, but it could be larger to implement
    /// guarded evaluation of predicates.
    ///
    /// This list should be sorted by the first field.
    pub predicates: Vec<(usize, MirScalarExpr)>,
    /// A sequence of column identifiers whose data form the output row.
    pub projection: Vec<usize>,
    /// The expected number of input columns.
    ///
    /// This is needed to enure correct identification of newly formed
    /// columns in the output.
    pub input_arity: usize,
}

impl MapFilterProject {
    /// Create a no-op operator for an input of a supplied arity.
    pub fn new(input_arity: usize) -> Self {
        Self {
            expressions: Vec::new(),
            predicates: Vec::new(),
            projection: (0..input_arity).collect(),
            input_arity,
        }
    }

    /// Given two mfps, return an mfp that applies one
    /// followed by the other.
    /// Note that the arguments are in the opposite order
    /// from how function composition is usually written in mathematics.
    pub fn compose(before: Self, after: Self) -> Self {
        let (m, f, p) = after.into_map_filter_project();
        before.map(m).filter(f).project(p)
    }

    /// True if the operator describes the identity transformation.
    pub fn is_identity(&self) -> bool {
        self.expressions.is_empty()
            && self.predicates.is_empty()
            && self.projection.len() == self.input_arity
            && self.projection.iter().enumerate().all(|(i, p)| i == *p)
    }

    /// Retain only the indicated columns in the presented order.
    pub fn project<I>(mut self, columns: I) -> Self
    where
        I: IntoIterator<Item = usize> + std::fmt::Debug,
    {
        self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
        self
    }

    /// Retain only rows satisfying these predicates.
    ///
    /// This method introduces predicates as eagerly as they can be evaluated,
    /// which may not be desired for predicates that may cause exceptions.
    /// If fine manipulation is required, the predicates can be added manually.
    pub fn filter<I>(mut self, predicates: I) -> Self
    where
        I: IntoIterator<Item = MirScalarExpr>,
    {
        for mut predicate in predicates {
            // Correct column references.
            predicate.permute(&self.projection[..]);

            // Validate column references.
            assert!(predicate
                .support()
                .into_iter()
                .all(|c| c < self.input_arity + self.expressions.len()));

            // Insert predicate as eagerly as it can be evaluated:
            // just after the largest column in its support is formed.
            let max_support = predicate
                .support()
                .into_iter()
                .max()
                .map(|c| c + 1)
                .unwrap_or(0);
            self.predicates.push((max_support, predicate))
        }
        // Stable sort predicates by position at which they take effect.
        // We put literal errors at the end as a stop-gap to avoid erroring
        // before we are able to evaluate any predicates that might prevent it.
        self.predicates
            .sort_by_key(|(position, predicate)| (predicate.is_literal_err(), *position));
        self
    }

    /// Append the result of evaluating expressions to each row.
    pub fn map<I>(mut self, expressions: I) -> Self
    where
        I: IntoIterator<Item = MirScalarExpr>,
    {
        for mut expression in expressions {
            // Correct column references.
            expression.permute(&self.projection[..]);

            // Validate column references.
            assert!(expression
                .support()
                .into_iter()
                .all(|c| c < self.input_arity + self.expressions.len()));

            // Introduce expression and produce as output.
            self.expressions.push(expression);
            self.projection
                .push(self.input_arity + self.expressions.len() - 1);
        }

        self
    }

    /// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
    pub fn into_map_filter_project(self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
        let predicates = self
            .predicates
            .into_iter()
            .map(|(_pos, predicate)| predicate)
            .collect();
        (self.expressions, predicates, self.projection)
    }

    /// As the arguments to `Map`, `Filter`, and `Project` operators.
    ///
    /// In principle, this operator can be implemented as a sequence of
    /// more elemental operators, likely less efficiently.
    pub fn as_map_filter_project(&self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
        self.clone().into_map_filter_project()
    }

    /// Determines if a scalar expression must be equal to a literal datum.
    pub fn literal_constraint(&self, expr: &MirScalarExpr) -> Option<Datum> {
        for (_pos, predicate) in self.predicates.iter() {
            if let MirScalarExpr::CallBinary {
                func: crate::BinaryFunc::Eq,
                expr1,
                expr2,
            } = predicate
            {
                if let Some(Ok(datum1)) = expr1.as_literal() {
                    if &**expr2 == expr {
                        return Some(datum1);
                    }
                }
                if let Some(Ok(datum2)) = expr2.as_literal() {
                    if &**expr1 == expr {
                        return Some(datum2);
                    }
                }
            }
        }
        None
    }

    /// Determines if a sequence of scalar expressions must be equal to a literal row.
    ///
    /// This method returns `None` on an empty `exprs`, which might be surprising, but
    /// seems to line up with its callers' expectations of that being a non-constraint.
    /// The caller knows if `exprs` is empty, and can modify their behavior appopriately.
    /// if they would rather have a literal empty row.
    pub fn literal_constraints(&self, exprs: &[MirScalarExpr]) -> Option<Row> {
        if exprs.is_empty() {
            return None;
        }
        let mut row = Row::default();
        for expr in exprs {
            if let Some(literal) = self.literal_constraint(expr) {
                row.push(literal);
            } else {
                return None;
            }
        }
        Some(row)
    }

    /// Extracts any MapFilterProject at the root of the expression.
    ///
    /// The expression will be modified to extract any maps, filters, and
    /// projections, which will be return as `Self`. If there are no maps,
    /// filters, or projections the method will return an identity operator.
    ///
    /// The extracted expressions may contain temporal predicates, and one
    /// should be careful to apply them blindly.
    pub fn extract_from_expression(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
        // TODO: This could become iterative rather than recursive if
        // we were able to fuse MFP operators from below, rather than
        // from above.
        match expr {
            MirRelationExpr::Map { input, scalars } => {
                let (mfp, expr) = Self::extract_from_expression(input);
                (mfp.map(scalars.iter().cloned()), expr)
            }
            MirRelationExpr::Filter { input, predicates } => {
                let (mfp, expr) = Self::extract_from_expression(input);
                (mfp.filter(predicates.iter().cloned()), expr)
            }
            MirRelationExpr::Project { input, outputs } => {
                let (mfp, expr) = Self::extract_from_expression(input);
                (mfp.project(outputs.iter().cloned()), expr)
            }
            x => (Self::new(x.arity()), x),
        }
    }

    /// Extracts an error-free MapFilterProject at the root of the expression.
    ///
    /// The expression will be modified to extract maps, filters, and projects
    /// from the root of the expression, which will be returned as `Self`. The
    /// extraction will halt if a Map or Filter containing a literal error is
    /// reached. Otherwise, the method will return an identity operator.
    ///
    /// This method is meant to be used during optimization, where it is
    /// necessary to avoid moving around maps and filters with errors.
    pub fn extract_non_errors_from_expr(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
        match expr {
            MirRelationExpr::Map { input, scalars }
                if scalars.iter().all(|s| !s.is_literal_err()) =>
            {
                let (mfp, expr) = Self::extract_from_expression(input);
                (mfp.map(scalars.iter().cloned()), expr)
            }
            MirRelationExpr::Filter { input, predicates }
                if predicates.iter().all(|p| !p.is_literal_err()) =>
            {
                let (mfp, expr) = Self::extract_from_expression(input);
                (mfp.filter(predicates.iter().cloned()), expr)
            }
            MirRelationExpr::Project { input, outputs } => {
                let (mfp, expr) = Self::extract_from_expression(input);
                (mfp.project(outputs.iter().cloned()), expr)
            }
            x => (Self::new(x.arity()), x),
        }
    }

    /// Removes an error-free MapFilterProject from the root of the expression.
    ///
    /// The expression will be modified to extract maps, filters, and projects
    /// from the root of the expression, which will be returned as `Self`. The
    /// extraction will halt if a Map or Filter containing a literal error is
    /// reached. Otherwise, the method will return an
    /// identity operator, and the expression will remain unchanged.
    ///
    /// This method is meant to be used during optimization, where it is
    /// necessary to avoid moving around maps and filters with errors.
    pub fn extract_non_errors_from_expr_mut(expr: &mut MirRelationExpr) -> Self {
        match expr {
            MirRelationExpr::Map { input, scalars }
                if scalars.iter().all(|s| !s.is_literal_err()) =>
            {
                let mfp =
                    Self::extract_non_errors_from_expr_mut(input).map(scalars.iter().cloned());
                *expr = input.take_dangerous();
                mfp
            }
            MirRelationExpr::Filter { input, predicates }
                if predicates.iter().all(|p| !p.is_literal_err()) =>
            {
                let mfp = Self::extract_non_errors_from_expr_mut(input)
                    .filter(predicates.iter().cloned());
                *expr = input.take_dangerous();
                mfp
            }
            MirRelationExpr::Project { input, outputs } => {
                let mfp =
                    Self::extract_non_errors_from_expr_mut(input).project(outputs.iter().cloned());
                *expr = input.take_dangerous();
                mfp
            }
            x => Self::new(x.arity()),
        }
    }

    /// Extracts temporal predicates into their own `Self`.
    ///
    /// Expressions that are used by the temporal predicates are exposed by `self.projection`,
    /// though there could be justification for extracting them as well if they are otherwise
    /// unused.
    ///
    /// This separation is valuable when the execution cannot be fused into one operator.
    pub fn extract_temporal(&mut self) -> Self {
        // Optimize the expression, as it is only post-optimization that we can be certain
        // that temporal expressions are restricted to filters. We could relax this in the
        // future to be only `inline_expressions` and `remove_undemanded`, but optimization
        // seems to be the best fit at the moment.
        self.optimize();

        // Assert that we no longer have temporal expressions to evaluate. This should only
        // occur if the optimization above results with temporal expressions yielded in the
        // output, which is out of spec for how the type is meant to be used.
        assert!(!self.expressions.iter().any(|e| e.contains_temporal()));

        // Extract temporal predicates from `self.predicates`.
        let mut temporal_predicates = Vec::new();
        self.predicates.retain(|(_position, predicate)| {
            if predicate.contains_temporal() {
                temporal_predicates.push(predicate.clone());
                false
            } else {
                true
            }
        });

        // Determine extended input columns used by temporal filters.
        let mut support = HashSet::new();
        for predicate in temporal_predicates.iter() {
            support.extend(predicate.support());
        }

        // Discover the locations of these columns after `self.projection`.
        let old_projection_len = self.projection.len();
        let mut new_location = HashMap::with_capacity(support.len());
        for original in support.iter() {
            if let Some(position) = self.projection.iter().position(|x| x == original) {
                new_location.insert(*original, position);
            } else {
                new_location.insert(*original, self.projection.len());
                self.projection.push(*original);
            }
        }
        // Permute references in extracted predicates to their new locations.
        for predicate in temporal_predicates.iter_mut() {
            predicate.permute_map(&new_location);
        }

        // Form a new `Self` containing the temporal predicates to return.
        Self::new(self.projection.len())
            .filter(temporal_predicates)
            .project(0..old_projection_len)
    }

    /// Returns `self`, and leaves behind an identity operator that acts on its output.
    pub fn take(&mut self) -> Self {
        let mut identity = Self::new(self.projection.len());
        std::mem::swap(self, &mut identity);
        identity
    }

    /// Convert the `MapFilterProject` into a staged evaluation plan.
    ///
    /// The main behavior is extract temporal predicates, which cannot be evaluated
    /// using the standard machinery.
    pub fn into_plan(self) -> Result<plan::MfpPlan, String> {
        plan::MfpPlan::create_from(self)
    }
}

impl MapFilterProject {
    /// Partitions `self` into two instances, one of which can be eagerly applied.
    ///
    /// The `available` argument indicates which input columns are available (keys)
    /// and in which positions (values). This information may allow some maps and
    /// filters to execute. The `input_arity` argument reports the total number of
    /// input columns (which may include some not present in `available`)
    ///
    /// This method partitions `self` in two parts, `(before, after)`, where `before`
    /// can be applied on columns present as keys in `available`, and `after` must
    /// await the introduction of the other input columns.
    ///
    /// The `before` instance will *append* any columns that can be determined from
    /// `available` but will project away any of these columns that are not needed by
    /// `after`. Importantly, this means that `before` will leave intact *all* input
    /// columns including those not referenced in `available`.
    ///
    /// The `after` instance will presume all input columns are available, followed
    /// by the appended columns of the `before` instance. It may be that some input
    /// columns can be projected away in `before` if `after` does not need them, but
    /// we leave that as something the caller can apply if needed (it is otherwise
    /// complicated to negotiate which input columns `before` should retain).
    ///
    /// To correctly reconstruct `self` from `before` and `after`, one must introduce
    /// additional input columns, permute all input columns to their locations as
    /// expected by `self`, follow this by new columns appended by `before`, and
    /// remove all other columns that may be present.
    ///
    /// # Example
    ///
    /// ```rust
    /// use expr::{BinaryFunc, MapFilterProject, MirScalarExpr};
    ///
    /// // imagine an action on columns (a, b, c, d).
    /// let original = MapFilterProject::new(4).map(vec![
    ///    MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::AddInt64),
    ///    MirScalarExpr::column(2).call_binary(MirScalarExpr::column(4), BinaryFunc::AddInt64),
    ///    MirScalarExpr::column(3).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
    /// ]).project(vec![6]);
    ///
    /// // Imagine we start with columns (b, x, a, y, c).
    /// //
    /// // The `partition` method requires a map from *expected* input columns to *actual*
    /// // input columns. In the example above, the columns a, b, and c exist, and are at
    /// // locations 2, 0, and 4 respectively. We must construct a map to this effect.
    /// let mut available_columns = std::collections::HashMap::new();
    /// available_columns.insert(0, 2);
    /// available_columns.insert(1, 0);
    /// available_columns.insert(2, 4);
    /// // Partition `original` using the available columns and current input arity.
    /// // This informs `partition` which columns are available, where they can be found,
    /// // and how many columns are not relevant but should be preserved.
    /// let (before, after) = original.partition(available_columns, 5);
    ///
    /// // `before` sees all five input columns, and should append `a + b + c`.
    /// assert_eq!(before, MapFilterProject::new(5).map(vec![
    ///    MirScalarExpr::column(2).call_binary(MirScalarExpr::column(0), BinaryFunc::AddInt64),
    ///    MirScalarExpr::column(4).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
    /// ]).project(vec![0, 1, 2, 3, 4, 6]));
    ///
    /// // `after` expects to see `(a, b, c, d, a + b + c)`.
    /// assert_eq!(after, MapFilterProject::new(5).map(vec![
    ///    MirScalarExpr::column(3).call_binary(MirScalarExpr::column(4), BinaryFunc::AddInt64)
    /// ]).project(vec![5]));
    ///
    /// // To reconstruct `self`, we must introduce the columns that are not present,
    /// // and present them in the order intended by `self`. In this example, we must
    /// // introduce colunm d and permute the columns so that they begin (a, b, c, d).
    /// // The columns x and y must be projected away, and any columns introduced by
    /// // `begin` must be retained in their current order.
    ///
    /// // The `after` instance expects to be provided with all inputs, but it
    /// // may not need all inputs. The `demand()` and `permute()` methods can
    /// // optimize the representation.
    /// ```
    pub fn partition(self, available: HashMap<usize, usize>, input_arity: usize) -> (Self, Self) {
        // Map expressions, filter predicates, and projections for `before` and `after`.
        let mut before_expr = Vec::new();
        let mut before_pred = Vec::new();
        let mut before_proj = Vec::new();
        let mut after_expr = Vec::new();
        let mut after_pred = Vec::new();
        let mut after_proj = Vec::new();

        // Track which output columns must be preserved in the output of `before`.
        let mut demanded = HashSet::new();
        demanded.extend(0..self.input_arity);
        demanded.extend(self.projection.iter());

        // Determine which map expressions can be computed from the available subset.
        // Some expressions may depend on other expressions, but by evaluating them
        // in forward order we should accurately determine the available expressions.
        let mut available_expr = vec![false; self.input_arity];
        // Initialize available columns from `available`, which is then not used again.
        for index in available.keys() {
            available_expr[*index] = true;
        }
        for expr in self.expressions.into_iter() {
            // We treat an expression as available if its supporting columns are available,
            // and if it is not a literal (we want to avoid pushing down literals). This
            // choice is ad-hoc, but the intent is that we partition the operators so
            // that we can reduce the row representation size and total computation.
            // Pushing down literals harms the former and does nothing for the latter.
            // In the future, we'll want to have a harder think about this trade-off, as
            // we are certainly making sub-optimal decisions by pushing down all available
            // work.
            // TODO(mcsherry): establish better principles about what work to push down.
            let is_available =
                expr.support().into_iter().all(|i| available_expr[i]) && !expr.is_literal();
            if is_available {
                before_expr.push(expr);
            } else {
                demanded.extend(expr.support());
                after_expr.push(expr);
            }
            available_expr.push(is_available);
        }

        // Determine which predicates can be computed from the available subset.
        for (_when, pred) in self.predicates.into_iter() {
            let is_available = pred.support().into_iter().all(|i| available_expr[i]);
            if is_available {
                before_pred.push(pred);
            } else {
                demanded.extend(pred.support());
                after_pred.push(pred);
            }
        }

        // Map from prior output location to location in un-projected `before`.
        // This map is used to correct references in `before` but it should be
        // adjusted to reflect `before`s projection prior to use in `after`.
        let mut before_map = available;
        // Input columns include any additional undescribed columns that may
        // not be captured by the `available` argument, so we must independently
        // track the current number of columns (vs relying on `before_map.len()`).
        let mut input_columns = input_arity;
        for index in self.input_arity..available_expr.len() {
            if available_expr[index] {
                before_map.insert(index, input_columns);
                input_columns += 1;
            }
        }

        // Permute the column references in `before` expressions and predicates.
        for expr in before_expr.iter_mut() {
            expr.permute_map(&before_map);
        }
        for pred in before_pred.iter_mut() {
            pred.permute_map(&before_map);
        }

        // Demand information determines `before`s output projection.
        // Specifically, we produce all input columns in the output, as well as
        // any columns that are available and demanded.
        before_proj.extend(0..input_arity);
        for index in self.input_arity..available_expr.len() {
            // If an intermediate result is both available and demanded,
            // we should produce it as output.
            if available_expr[index] && demanded.contains(&index) {
                // Use the new location of `index`.
                before_proj.push(before_map[&index]);
            }
        }

        // Map from prior output locations to location in post-`before` columns.
        // This map is used to correct references in `after`.
        // The presumption is that `after` will be presented with all input columns,
        // followed by the output columns introduced by `before` in order.
        let mut after_map = HashMap::new();
        for index in 0..self.input_arity {
            after_map.insert(index, index);
        }
        for index in self.input_arity..available_expr.len() {
            // If an intermediate result is both available and demanded,
            // it was produced as output.
            if available_expr[index] && demanded.contains(&index) {
                // We expect to find the output as far after `self.input_arity` as
                // it was produced after `input_arity` in the output of `before`.
                let location = self.input_arity
                    + (before_proj
                        .iter()
                        .position(|x| x == &before_map[&index])
                        .unwrap()
                        - input_arity);
                after_map.insert(index, location);
            }
        }
        // We must now re-map the remaining non-demanded expressions, which are
        // contiguous rather than potentially interspersed.
        for index in self.input_arity..available_expr.len() {
            if !available_expr[index] {
                after_map.insert(index, after_map.len());
            }
        }

        // Permute the column references in `after` expressions and predicates.
        for expr in after_expr.iter_mut() {
            expr.permute_map(&after_map);
        }
        for pred in after_pred.iter_mut() {
            pred.permute_map(&after_map);
        }
        // Populate `after` projection with the new locations of `self.projection`.
        for index in self.projection {
            after_proj.push(after_map[&index]);
        }

        // Form and return the before and after MapFilterProject instances.
        let before = Self::new(input_arity)
            .map(before_expr)
            .filter(before_pred)
            .project(before_proj.clone());
        let after = Self::new(self.input_arity + (before_proj.len() - input_arity))
            .map(after_expr)
            .filter(after_pred)
            .project(after_proj);
        (before, after)
    }

    /// Lists input columns whose values are used in outputs.
    ///
    /// It is entirely appropriate to determine the demand of an instance
    /// and then both apply a projection to the subject of the instance and
    /// `self.permute` this instance.
    pub fn demand(&self) -> HashSet<usize> {
        let mut demanded = HashSet::new();
        for (_index, pred) in self.predicates.iter() {
            demanded.extend(pred.support());
        }
        demanded.extend(self.projection.iter().cloned());
        for index in (0..self.expressions.len()).rev() {
            if demanded.contains(&(self.input_arity + index)) {
                demanded.extend(self.expressions[index].support());
            }
        }
        demanded.retain(|col| col < &self.input_arity);
        demanded
    }

    /// Update input column references, due to an input projection or permutation.
    ///
    /// The `shuffle` argument remaps expected column identifiers to new locations,
    /// with the expectation that `shuffle` describes all input columns, and so the
    /// intermediate results will be able to start at position `shuffle.len()`.
    ///
    /// The supplied `shuffle` may not list columns that are not "demanded" by the
    /// instance, and so we should ensure that `self` is optimized to not reference
    /// columns that are not demanded.
    pub fn permute(&mut self, mut shuffle: HashMap<usize, usize>, new_input_arity: usize) {
        let (mut map, mut filter, mut project) = self.as_map_filter_project();
        for index in 0..map.len() {
            // Intermediate columns are just shifted.
            shuffle.insert(self.input_arity + index, new_input_arity + index);
        }
        for expr in map.iter_mut() {
            expr.permute_map(&shuffle);
        }
        for pred in filter.iter_mut() {
            pred.permute_map(&shuffle);
        }
        for proj in project.iter_mut() {
            *proj = shuffle[proj];
        }
        *self = Self::new(new_input_arity)
            .map(map)
            .filter(filter)
            .project(project)
    }
}

// Optimization routines.
impl MapFilterProject {
    /// Optimize the internal expression evaluation order.
    ///
    /// This method performs several optimizations that are meant to streamline
    /// the execution of the `MapFilterProject` instance, but not to alter its
    /// semantics. This includes extracting expressions that are used multiple
    /// times, inlining those that are not, and removing expressions that are
    /// unreferenced.
    ///
    /// This method will inline all temporal expressions, and remove any columns
    /// that are not demanded by the output, which should transform any temporal
    /// filters to a state where the temporal expressions exist only in the list
    /// of predicates.
    ///
    /// # Example
    ///
    /// This example demonstrates how the re-use of one expression, converting
    /// column 1 from a string to an integer, can be extracted and the results
    /// shared among the two uses. This example is used for each of the steps
    /// along the optimization path.
    ///
    /// ```rust
    /// use expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
    /// // Demonstrate extraction of common expressions (here: parsing strings).
    /// let mut map_filter_project = MapFilterProject::new(5)
    ///     .map(vec![
    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
    ///     ])
    ///     .project(vec![3,4,5,6]);
    ///
    /// let mut expected_optimized = MapFilterProject::new(5)
    ///     .map(vec![
    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
    ///     ])
    ///     .project(vec![3,4,6,7]);
    ///
    /// // Optimize the expression.
    /// map_filter_project.optimize();
    ///
    /// assert_eq!(
    ///     map_filter_project,
    ///     expected_optimized,
    /// );
    /// ```
    pub fn optimize(&mut self) {
        // Optimization memoizes individual `ScalarExpr` expressions that
        // are sure to be evaluated, canonicalizes references to the first
        // occurrence of each, inlines expressions that have a reference
        // count of one, and then removes any expressions that are not
        // referenced.
        self.memoize_expressions();
        self.inline_expressions();
        self.remove_undemanded();

        // Re-build `self` from parts to restore evaluation order invariants.
        let (map, filter, project) = self.as_map_filter_project();
        *self = Self::new(self.input_arity)
            .map(map)
            .filter(filter)
            .project(project);
    }

    /// Place each certainly evaluated expression in its own column.
    ///
    /// This method places each non-trivial, certainly evaluated expression
    /// in its own column, and deduplicates them so that all references to
    /// the same expression reference the same column.
    ///
    /// This tranformation is restricted to expressions we are certain will
    /// be evaluated, which does not include expressions in `if` statements.
    ///
    /// # Example
    ///
    /// This example demonstrates how memoization notices `MirScalarExpr`s
    /// that are used multiple times, and ensures that each are extracted
    /// into columns and then referenced by column. This pass does not try
    /// to minimize the occurrences of column references, which will happen
    /// in inliniing.
    ///
    /// ```rust
    /// use expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
    /// // Demonstrate extraction of common expressions (here: parsing strings).
    /// let mut map_filter_project = MapFilterProject::new(5)
    ///     .map(vec![
    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
    ///     ])
    ///     .project(vec![3,4,5,6]);
    ///
    /// let mut expected_optimized = MapFilterProject::new(5)
    ///     .map(vec![
    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(7),
    ///         MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(10),
    ///     ])
    ///     .project(vec![3,4,8,11]);
    ///
    /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
    /// map_filter_project.memoize_expressions();
    ///
    /// assert_eq!(
    ///     map_filter_project,
    ///     expected_optimized,
    /// );
    /// ```
    ///
    /// Expressions may not be memoized if they are not certain to be evaluated,
    /// for example if they occur in conditional branches of a `MirScalarExpr::If`.
    ///
    /// ```rust
    /// use expr::{MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
    /// // Demonstrate extraction of unconditionally evaluated expressions, as well as
    /// // the non-extraction of common expressions guarded by conditions.
    /// let mut map_filter_project = MapFilterProject::new(2)
    ///     .map(vec![
    ///         MirScalarExpr::If {
    ///             cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::Lt)),
    ///             then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
    ///             els:  Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
    ///         },
    ///         MirScalarExpr::If {
    ///             cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::Lt)),
    ///             then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
    ///             els:  Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
    ///         },
    ///     ]);
    ///
    /// let mut expected_optimized = MapFilterProject::new(2)
    ///     .map(vec![
    ///         MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::Lt),
    ///         MirScalarExpr::If {
    ///             cond: Box::new(MirScalarExpr::column(2)),
    ///             then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
    ///             els:  Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
    ///         },
    ///         MirScalarExpr::column(3),
    ///         MirScalarExpr::If {
    ///             cond: Box::new(MirScalarExpr::column(2)),
    ///             then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
    ///             els:  Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
    ///         },
    ///         MirScalarExpr::column(5),
    ///     ])
    ///     .project(vec![0,1,4,6]);
    ///
    /// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
    /// map_filter_project.memoize_expressions();
    ///
    /// assert_eq!(
    ///     map_filter_project,
    ///     expected_optimized,
    /// );
    /// ```
    pub fn memoize_expressions(&mut self) {
        // Record the mapping from starting column references to new column
        // references.
        let mut remaps = HashMap::new();
        for index in 0..self.input_arity {
            remaps.insert(index, index);
        }
        let mut new_expressions = Vec::new();

        // We follow the same order as for evaluation, to ensure that all
        // column references exist in time for their evaluation. We could
        // prioritize predicates, but we would need to be careful to chase
        // down column references to expressions and memoize those as well.
        let mut expression = 0;
        for (support, predicate) in self.predicates.iter_mut() {
            while self.input_arity + expression < *support {
                self.expressions[expression].permute_map(&remaps);
                memoize_expr(
                    &mut self.expressions[expression],
                    &mut new_expressions,
                    self.input_arity,
                );
                remaps.insert(
                    self.input_arity + expression,
                    self.input_arity + new_expressions.len(),
                );
                new_expressions.push(self.expressions[expression].clone());
                expression += 1;
            }
            predicate.permute_map(&remaps);
            memoize_expr(predicate, &mut new_expressions, self.input_arity);
        }
        while expression < self.expressions.len() {
            self.expressions[expression].permute_map(&remaps);
            memoize_expr(
                &mut self.expressions[expression],
                &mut new_expressions,
                self.input_arity,
            );
            remaps.insert(
                self.input_arity + expression,
                self.input_arity + new_expressions.len(),
            );
            new_expressions.push(self.expressions[expression].clone());
            expression += 1;
        }

        self.expressions = new_expressions;
        for proj in self.projection.iter_mut() {
            *proj = remaps[proj];
        }
    }

    /// This method inlines expressions with a single use.
    ///
    /// This method only inlines expressions; it does not delete expressions
    /// that are no longer referenced. The `remove_undemanded()` method does
    /// that, and should likely be used after this method.
    ///
    /// Inlining replaces column references when the refered-to item is either
    /// another column reference, or the only referrer of its referent. This
    /// is most common after memoization has atomized all expressions to seek
    /// out re-use: inlining re-assembles expressions that were not helpfully
    /// shared with other expressions.
    ///
    /// # Example
    ///
    /// In this example, we see that with only a single reference to columns
    /// 0 and 2, their parsing can each be inlined. Similarly, column references
    /// can be cleaned up among expressions, and in the final projection.
    ///
    /// Also notice the remaining expressions, which can be cleaned up in a later
    /// pass (the `remove_undemanded` method).
    ///
    /// ```rust
    /// use expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
    /// // Use the output from first `memoize_expression` example.
    /// let mut map_filter_project = MapFilterProject::new(5)
    ///     .map(vec![
    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(7),
    ///         MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(10),
    ///     ])
    ///     .project(vec![3,4,8,11]);
    ///
    /// let mut expected_optimized = MapFilterProject::new(5)
    ///     .map(vec![
    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
    ///     ])
    ///     .project(vec![3,4,8,11]);
    ///
    /// // Inline expressions that are referenced only once.
    /// map_filter_project.inline_expressions();
    ///
    /// assert_eq!(
    ///     map_filter_project,
    ///     expected_optimized,
    /// );
    /// ```
    pub fn inline_expressions(&mut self) {
        // Local copy of input_arity to avoid borrowing `self` in closures.
        let input_arity = self.input_arity;
        // Reference counts track the number of places that a reference occurs.
        let mut reference_count = vec![0; input_arity + self.expressions.len()];
        // Increment reference counts for each use
        for expr in self.expressions.iter() {
            for col in expr.support().into_iter() {
                reference_count[col] += 1;
            }
        }
        for (_, pred) in self.predicates.iter() {
            for col in pred.support().into_iter() {
                reference_count[col] += 1;
            }
        }
        for proj in self.projection.iter() {
            reference_count[*proj] += 1;
        }

        // Determine which expressions should be inlined because they reference temporal expressions.
        let mut is_temporal = vec![false; input_arity];
        for expr in self.expressions.iter() {
            // An express may contain a temporal expression, or reference a column containing such.
            is_temporal.push(
                expr.contains_temporal() || expr.support().into_iter().any(|col| is_temporal[col]),
            );
        }

        // Inline only those columns that 1. are expressions not inputs, and
        // 2a. are column references or literals or 2b. have a refcount of 1,
        // or 2c. reference temporal expressions (which cannot be evaluated).
        let mut should_inline = vec![false; reference_count.len()];
        for i in (input_arity..reference_count.len()).rev() {
            if let MirScalarExpr::Column(c) = self.expressions[i - input_arity] {
                should_inline[i] = true;
                // The reference count of the referenced column should be
                // incremented with the number of references
                // `self.expressions[i - input_arity]` has.
                // Subtract 1 because `self.expressions[i - input_arity]` is
                // itself a reference.
                reference_count[c] += reference_count[i] - 1;
            } else {
                should_inline[i] = reference_count[i] == 1 || is_temporal[i];
            }
        }
        // Inline expressions per `should_inline`.
        for index in 0..self.expressions.len() {
            let (prior, expr) = self.expressions.split_at_mut(index);
            expr[0].visit_mut_post(&mut |e| {
                if let MirScalarExpr::Column(i) = e {
                    if should_inline[*i] {
                        *e = prior[*i - input_arity].clone();
                    }
                }
            });
        }
        for (_index, pred) in self.predicates.iter_mut() {
            let expressions = &self.expressions;
            pred.visit_mut_post(&mut |e| {
                if let MirScalarExpr::Column(i) = e {
                    if should_inline[*i] {
                        *e = expressions[*i - input_arity].clone();
                    }
                }
            });
        }
        // We can only inline column references in `self.projection`, but we should.
        for proj in self.projection.iter_mut() {
            if *proj >= self.input_arity {
                if let MirScalarExpr::Column(i) = self.expressions[*proj - self.input_arity] {
                    *proj = i;
                }
            }
        }
    }

    /// Removes unused expressions from `self.expressions`.
    ///
    /// Expressions are "used" if they are relied upon by any output columns
    /// or any predicates, even transitively. Any expressions that are not
    /// relied upon in this way can be discarded.
    ///
    /// # Example
    ///
    /// ```rust
    /// use expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
    /// // Use the output from `inline_expression` example.
    /// let mut map_filter_project = MapFilterProject::new(5)
    ///     .map(vec![
    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
    ///     ])
    ///     .project(vec![3,4,8,11]);
    ///
    /// let mut expected_optimized = MapFilterProject::new(5)
    ///     .map(vec![
    ///         MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
    ///         MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
    ///         MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
    ///     ])
    ///     .project(vec![3,4,6,7]);
    ///
    /// // Remove undemandedd expressions, streamlining the work done..
    /// map_filter_project.remove_undemanded();
    ///
    /// assert_eq!(
    ///     map_filter_project,
    ///     expected_optimized,
    /// );
    /// ```
    pub fn remove_undemanded(&mut self) {
        // Determine the demanded expressions to remove irrelevant ones.
        let mut demand = std::collections::HashSet::new();
        for (_index, pred) in self.predicates.iter() {
            demand.extend(pred.support());
        }
        // Start from the output columns as presumed demanded.
        // If this is not the case, the caller should project some away.
        demand.extend(self.projection.iter().cloned());
        // Proceed in *reverse* order, as expressions may depend on other
        // expressions that precede them.
        for index in (0..self.expressions.len()).rev() {
            if demand.contains(&(self.input_arity + index)) {
                demand.extend(self.expressions[index].support());
            }
        }

        // Maintain a map from initial column identifiers to locations
        // once we have removed undemanded expressions.
        let mut remap = HashMap::new();
        // This map only needs to map elements of `demand` to a new location,
        // but the logic is easier if we include all input columns (as the
        // new position is then determined by the size of the map).
        for index in 0..self.input_arity {
            remap.insert(index, index);
        }
        // Retain demanded expressions, and record their new locations.
        let mut new_expressions = Vec::new();
        for (index, expr) in self.expressions.drain(..).enumerate() {
            if demand.contains(&(index + self.input_arity)) {
                remap.insert(index + self.input_arity, remap.len());
                new_expressions.push(expr);
            }
        }
        self.expressions = new_expressions;

        // Update column identifiers; rebuild `Self` to re-establish any invariants.
        // We mirror `self.permute(&remap)` but we specifically want to remap columns
        // that are produced by `self.expressions` after the input columns.
        let (expressions, predicates, projection) = self.as_map_filter_project();
        *self = Self::new(self.input_arity)
            .map(expressions.into_iter().map(|mut e| {
                e.permute_map(&remap);
                e
            }))
            .filter(predicates.into_iter().map(|mut p| {
                p.permute_map(&remap);
                p
            }))
            .project(projection.into_iter().map(|c| remap[&c]));
    }
}

// TODO: move this elsewhere?
/// Recursively memoize parts of `expr`, storing those parts in `memoized_parts`.
///
/// A part of `expr` that is memoized is replaced by a reference to column
/// `(input_arity + pos)`, where `pos` is the position of the memoized part in
/// `memoized_parts`, and `input_arity` is the arity of the input that `expr`
/// refers to.
pub fn memoize_expr(
    expr: &mut MirScalarExpr,
    memoized_parts: &mut Vec<MirScalarExpr>,
    input_arity: usize,
) {
    expr.visit_mut_pre_post(
        &mut |e| {
            // We should not eagerly memoize `if` branches that might not be taken.
            // TODO: Memoize expressions in the intersection of `then` and `els`.
            if let MirScalarExpr::If { cond, .. } = e {
                return Some(vec![cond]);
            }
            None
        },
        &mut |e| {
            match e {
                MirScalarExpr::Column(_) | MirScalarExpr::Literal(_, _) => {
                    // Literals do not need to be memoized.
                }
                _ => {
                    if let Some(position) = memoized_parts.iter().position(|e2| e2 == e) {
                        // Any complex expression that already exists as a prior column can
                        // be replaced by a reference to that column.
                        *e = MirScalarExpr::Column(input_arity + position);
                    } else {
                        // A complex expression that does not exist should be memoized, and
                        // replaced by a reference to the column.
                        memoized_parts.push(std::mem::replace(
                            e,
                            MirScalarExpr::Column(input_arity + memoized_parts.len()),
                        ));
                    }
                }
            }
        },
    )
}

pub mod util {
    use std::collections::HashMap;

    use crate::MirScalarExpr;

    /// Return the map associating columns in the logical,
    /// unthinned representation of a collection to columns in the
    /// thinned representation of the arrangement corresponding to `key`.
    ///
    /// Returns the permutation and the thinning
    /// expression that should be used to create the arrangement.
    ///
    /// The permutations and thinning expressions generated here will be tracked in
    /// `dataflow::plan::AvailableCollections`; see the
    /// documentation there for more details.
    pub fn permutation_for_arrangement<B: FromIterator<(usize, usize)>>(
        key: &[MirScalarExpr],
        unthinned_arity: usize,
    ) -> (B, Vec<usize>) {
        let columns_in_key: HashMap<_, _> = key
            .iter()
            .enumerate()
            .filter_map(|(i, key_col)| key_col.as_column().map(|c| (c, i)))
            .collect();
        let mut input_cursor = key.len();
        let permutation = (0..unthinned_arity)
            .map(|c| {
                if let Some(c) = columns_in_key.get(&c) {
                    // Column is in key (and thus gone from the value
                    // of the thinned representation)
                    *c
                } else {
                    // Column remains in value of the thinned representation
                    input_cursor += 1;
                    input_cursor - 1
                }
            })
            .enumerate()
            .collect();
        let thinning = (0..unthinned_arity)
            .into_iter()
            .filter(|c| !columns_in_key.contains_key(&c))
            .collect();
        (permutation, thinning)
    }

    /// Given the permutations (see [`permutation_for_arrangement`] and
    /// (`dataflow::plan::AvailableCollections`) corresponding to two
    /// collections with the same key arity,
    /// computes the permutation for the result of joining them.
    pub fn join_permutations(
        key_arity: usize,
        stream_permutation: HashMap<usize, usize>,
        thinned_stream_arity: usize,
        lookup_permutation: HashMap<usize, usize>,
    ) -> HashMap<usize, usize> {
        let stream_arity = stream_permutation
            .keys()
            .cloned()
            .max()
            .map(|i| i + 1)
            .unwrap_or(0);
        let lookup_arity = lookup_permutation
            .keys()
            .cloned()
            .max()
            .map(|i| i + 1)
            .unwrap_or(0);

        (0..stream_arity + lookup_arity)
            .map(|i| {
                let location = if i < stream_arity {
                    stream_permutation[&i]
                } else {
                    let location_in_lookup = lookup_permutation[&(i - stream_arity)];
                    if location_in_lookup < key_arity {
                        location_in_lookup
                    } else {
                        location_in_lookup + thinned_stream_arity
                    }
                };
                (i, location)
            })
            .collect()
    }
}

pub mod plan {

    use std::collections::HashMap;

    use serde::{Deserialize, Serialize};

    use crate::{
        func, BinaryFunc, EvalError, MapFilterProject, MirScalarExpr, NullaryFunc, UnaryFunc,
    };
    use repr::adt::numeric::Numeric;
    use repr::{Datum, Diff, Row, RowArena, ScalarType};

    /// A wrapper type which indicates it is safe to simply evaluate all expressions.
    #[derive(Clone, Debug, Serialize, Deserialize)]
    pub struct SafeMfpPlan {
        mfp: MapFilterProject,
    }

    impl SafeMfpPlan {
        pub fn permute(&mut self, map: HashMap<usize, usize>, new_arity: usize) {
            self.mfp.permute(map, new_arity);
        }
        /// Evaluates the linear operator on a supplied list of datums.
        ///
        /// The arguments are the initial datums associated with the row,
        /// and an appropriately lifetimed arena for temporary allocations
        /// needed by scalar evaluation.
        ///
        /// An `Ok` result will either be `None` if any predicate did not
        /// evaluate to `Datum::True`, or the values of the columns listed
        /// by `self.projection` if all predicates passed. If an error
        /// occurs in the evaluation it is returned as an `Err` variant.
        /// As the evaluation exits early with failed predicates, it may
        /// miss some errors that would occur later in evaluation.
        ///
        /// The `row` is not cleared first, but emptied if the function
        /// returns `Ok(Some(row)).
        #[inline(always)]
        pub fn evaluate_into<'a>(
            &'a self,
            datums: &mut Vec<Datum<'a>>,
            arena: &'a RowArena,
            row: &'a mut Row,
        ) -> Result<Option<Row>, EvalError> {
            let passed_predicates = self.evaluate_inner(datums, arena)?;
            if !passed_predicates {
                Ok(None)
            } else {
                row.clear();
                row.extend(self.mfp.projection.iter().map(|c| datums[*c]));
                Ok(Some(row.finish_and_reuse()))
            }
        }

        /// A version of `evaluate` which produces an iterator over `Datum`
        /// as output.
        ///
        /// This version can be useful when one wants to capture the resulting
        /// datums without packing and then unpacking a row.
        #[inline(always)]
        pub fn evaluate_iter<'b, 'a: 'b>(
            &'a self,
            datums: &'b mut Vec<Datum<'a>>,
            arena: &'a RowArena,
        ) -> Result<Option<impl Iterator<Item = Datum<'a>> + 'b>, EvalError> {
            let passed_predicates = self.evaluate_inner(datums, arena)?;
            if !passed_predicates {
                Ok(None)
            } else {
                Ok(Some(self.mfp.projection.iter().map(move |i| datums[*i])))
            }
        }

        /// Populates `datums` with `self.expressions` and tests `self.predicates`.
        ///
        /// This does not apply `self.projection`, which is up to the calling method.
        pub fn evaluate_inner<'b, 'a: 'b>(
            &'a self,
            datums: &'b mut Vec<Datum<'a>>,
            arena: &'a RowArena,
        ) -> Result<bool, EvalError> {
            let mut expression = 0;
            for (support, predicate) in self.mfp.predicates.iter() {
                while self.mfp.input_arity + expression < *support {
                    datums.push(self.mfp.expressions[expression].eval(&datums[..], &arena)?);
                    expression += 1;
                }
                if predicate.eval(&datums[..], &arena)? != Datum::True {
                    return Ok(false);
                }
            }
            while expression < self.mfp.expressions.len() {
                datums.push(self.mfp.expressions[expression].eval(&datums[..], &arena)?);
                expression += 1;
            }
            Ok(true)
        }
    }

    impl std::ops::Deref for SafeMfpPlan {
        type Target = MapFilterProject;
        fn deref(&self) -> &Self::Target {
            &self.mfp
        }
    }

    /// Predicates partitioned into temporal and non-temporal.
    ///
    /// Temporal predicates require some recognition to determine their
    /// structure, and it is best to do that once and re-use the results.
    ///
    /// There are restrictions on the temporal predicates we currently support.
    /// They must directly constrain `MzLogicalTimestamp` from below or above,
    /// by expressions that do not themselves contain `MzLogicalTimestamp`.
    /// Conjunctions of such constraints are also ok.
    #[derive(Clone, Debug)]
    pub struct MfpPlan {
        /// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
        mfp: SafeMfpPlan,
        /// Expressions that when evaluated lower-bound `MzLogicalTimestamp`.
        lower_bounds: Vec<MirScalarExpr>,
        /// Expressions that when evaluated upper-bound `MzLogicalTimestamp`.
        upper_bounds: Vec<MirScalarExpr>,
    }

    impl MfpPlan {
        /// Partitions `predicates` into non-temporal, and lower and upper temporal bounds.
        ///
        /// The first returned list is of predicates that do not contain `mz_logical_timestamp`.
        /// The second and third returned lists contain expressions that, once evaluated, lower
        /// and upper bound the validity interval of a record, respectively. These second two
        /// lists are populared only by binary expressions of the form
        /// ```ignore
        /// mz_logical_timestamp cmp_op expr
        /// ```
        /// where `cmp_op` is a comparison operator and `expr` does not contain `mz_logical_timestamp`.
        ///
        /// If any unsupported expression is found, for example one that uses `mz_logical_timestamp`
        /// in an unsupported position, an error is returned.
        pub(crate) fn create_from(mut mfp: MapFilterProject) -> Result<Self, String> {
            let mut lower_bounds = Vec::new();
            let mut upper_bounds = Vec::new();

            let mut temporal = Vec::new();

            // Optimize, to ensure that temporal predicates are move in to `mfp.predicates`.
            mfp.optimize();

            mfp.predicates.retain(|(_position, predicate)| {
                if predicate.contains_temporal() {
                    temporal.push(predicate.clone());
                    false
                } else {
                    true
                }
            });

            for predicate in temporal.into_iter() {
                // Supported temporal predicates are exclusively binary operators.
                if let MirScalarExpr::CallBinary {
                    mut func,
                    mut expr1,
                    mut expr2,
                } = predicate
                {
                    // Attempt to put `MzLogicalTimestamp` in the first argument position.
                    if !expr1.contains_temporal()
                        && *expr2 == MirScalarExpr::CallNullary(NullaryFunc::MzLogicalTimestamp)
                    {
                        std::mem::swap(&mut expr1, &mut expr2);
                        func = match func {
                            BinaryFunc::Eq => BinaryFunc::Eq,
                            BinaryFunc::Lt => BinaryFunc::Gt,
                            BinaryFunc::Lte => BinaryFunc::Gte,
                            BinaryFunc::Gt => BinaryFunc::Lt,
                            BinaryFunc::Gte => BinaryFunc::Lte,
                            x => {
                                return Err(format!(
                                    "Unsupported binary temporal operation: {:?}",
                                    x
                                ));
                            }
                        };
                    }

                    // Error if MLT is referenced in an unsuppported position.
                    if expr2.contains_temporal()
                        || *expr1 != MirScalarExpr::CallNullary(NullaryFunc::MzLogicalTimestamp)
                    {
                        return Err(format!(
                            "Unsupported temporal predicate. Note: `mz_logical_timestamp()` must be directly compared to a numeric non-temporal expression; if it is compared to a non-numeric type, consider casting that type to numeric. Expression found: {:?}",
                            MirScalarExpr::CallBinary { func, expr1, expr2 },
                            ));
                    }

                    // We'll need to use this a fair bit.
                    let decimal_one = MirScalarExpr::literal_ok(
                        Datum::from(Numeric::from(1)),
                        ScalarType::Numeric { scale: None },
                    );

                    // TODO(#7611): Truncate any significant fractional digits
                    // from positive values of `expr2`, while keeping the sign
                    // of negative values intact. (Negative values are never
                    // chosen as valid bounds, so their resultant values matter
                    // little, but we do want to know they were negative.)
                    let expr2_floor = expr2.call_unary(UnaryFunc::FloorNumeric(func::FloorNumeric));

                    // MzLogicalTimestamp <OP> <EXPR2> for several supported operators.
                    match func {
                        BinaryFunc::Eq => {
                            // Lower bound of expr, upper bound of expr+1
                            lower_bounds.push((expr2_floor).clone());
                            upper_bounds
                                .push(expr2_floor.call_binary(decimal_one, BinaryFunc::AddNumeric));
                        }
                        BinaryFunc::Lt => {
                            upper_bounds.push(expr2_floor);
                        }
                        BinaryFunc::Lte => {
                            upper_bounds
                                .push(expr2_floor.call_binary(decimal_one, BinaryFunc::AddNumeric));
                        }
                        BinaryFunc::Gt => {
                            lower_bounds
                                .push(expr2_floor.call_binary(decimal_one, BinaryFunc::AddNumeric));
                        }
                        BinaryFunc::Gte => {
                            lower_bounds.push(expr2_floor);
                        }
                        _ => {
                            return Err(format!(
                                "Unsupported binary temporal operation: {:?}",
                                func
                            ));
                        }
                    }
                } else {
                    return Err(format!(
                        "Unsupported temporal predicate. Note: `mz_logical_timestamp()` must be directly compared to a numeric non-temporal expression; if it is compared to a non-numeric type, consider casting that type to numeric. Expression found: {:?}",
                        predicate,
                        ));
                }
            }

            Ok(Self {
                mfp: SafeMfpPlan { mfp },
                lower_bounds,
                upper_bounds,
            })
        }

        /// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
        pub fn is_identity(&self) -> bool {
            self.mfp.mfp.is_identity()
                && self.lower_bounds.is_empty()
                && self.upper_bounds.is_empty()
        }

        /// Attempt to convert self into a non-temporal MapFilterProject plan.
        ///
        /// If that is not possible, the original instance is returned as an error.
        pub fn into_nontemporal(self) -> Result<SafeMfpPlan, Self> {
            if self.lower_bounds.is_empty() && self.upper_bounds.is_empty() {
                Ok(self.mfp)
            } else {
                Err(self)
            }
        }

        /// Evaluate the predicates, temporal and non-, and return times and differences for `data`.
        ///
        /// If `self` contains only non-temporal predicates, the result will either be `(time, diff)`,
        /// or an evaluation error. If `self contains temporal predicates, the results can be times
        /// that are greater than the input `time`, and may contain negated `diff` values.
        ///
        /// The `row_builder` is not cleared first, but emptied if the function
        /// returns an iterator with any `Ok(_)` element.
        pub fn evaluate<'b, 'a: 'b, E: From<EvalError>>(
            &'a self,
            datums: &'b mut Vec<Datum<'a>>,
            arena: &'a RowArena,
            time: repr::Timestamp,
            diff: Diff,
            row_builder: &mut Row,
        ) -> impl Iterator<Item = Result<(Row, repr::Timestamp, Diff), (E, repr::Timestamp, Diff)>>
        {
            match self.mfp.evaluate_inner(datums, &arena) {
                Err(e) => {
                    return Some(Err((e.into(), time, diff)))
                        .into_iter()
                        .chain(None.into_iter());
                }
                Ok(true) => {}
                Ok(false) => {
                    return None.into_iter().chain(None.into_iter());
                }
            }

            // Lower and upper bounds. If set, the value indicates the respective
            // bound. If not set, indicates "larger than `u64::MAX`".
            let mut lower_bound = Some(time);
            let mut upper_bound = None;

            // Track whether we have seen a null in either bound, as this should
            // prevent the record from being produced at any time.
            let mut null_eval = false;

            // Advance our lower bound to be at least the result of any lower bound
            // expressions.
            for l in self.lower_bounds.iter() {
                // If the lower bound already exceeds a `u64`, we needn't check more.
                if lower_bound.is_some() {
                    match l.eval(datums, &arena) {
                        Err(e) => {
                            return Some(Err((e.into(), time, diff)))
                                .into_iter()
                                .chain(None.into_iter());
                        }
                        Ok(Datum::Numeric(d)) => {
                            // If the number is negative, it cannot improve the initial
                            // value of `Some(time)`.
                            if !d.0.is_negative() {
                                // An `Ok` conversion is a valid upper bound, and an `Err` error
                                // indicates a value above `u64::MAX`. The `ok()` method does the
                                // conversion we want to an `Option<u64>`.
                                let v = u64::try_from(d.0).ok();
                                // Update `lower_bound` to be the maximum with `v`, where a `None`
                                // value is treated as larger than `Some(_)` values.
                                lower_bound = match (lower_bound, v) {
                                    (None, _) => None,
                                    (_, None) => None,
                                    (x, y) => x.max(y),
                                };
                            }
                        }
                        Ok(Datum::Null) => {
                            null_eval = true;
                        }
                        x => {
                            panic!("Non-decimal value in temporal predicate: {:?}", x);
                        }
                    }
                }
            }

            // If the lower bound exceeds `u64::MAX` the update cannot appear in the output.
            if lower_bound.is_none() {
                return None.into_iter().chain(None.into_iter());
            }

            // If there are any upper bounds, determine the minimum upper bound.
            for u in self.upper_bounds.iter() {
                // We can cease as soon as the lower and upper bounds match,
                // as the update will certainly not be produced in that case.
                if upper_bound != lower_bound {
                    match u.eval(datums, &arena) {
                        Err(e) => {
                            return Some(Err((e.into(), time, diff)))
                                .into_iter()
                                .chain(None.into_iter());
                        }
                        Ok(Datum::Numeric(d)) => {
                            // If the upper bound is negative, it is impossible to satisfy.
                            // We set the upper bound to the lower bound to indicate this.
                            if d.0.is_negative() {
                                upper_bound = lower_bound;
                            } else {
                                // An `Ok` conversion is a valid upper bound, and an `Err` error
                                // indicates a value above `u64::MAX`. The `ok()` method does the
                                // conversion we want to an `Option<u64>`.
                                let v = u64::try_from(d.0).ok();
                                // Update `upper_bound` to be the minimum with `v`, where a `None`
                                // value is treated as larger than `Some(_)` values.
                                upper_bound = match (upper_bound, v) {
                                    (None, x) => x,
                                    (x, None) => x,
                                    (x, y) => x.min(y),
                                };
                                // Force the upper bound to be at least the lower bound.
                                // The `is_some()` test is required as `None < Some(_)`,
                                // and we do not want to set `None` to `Some(_)`.
                                if upper_bound.is_some() && upper_bound < lower_bound {
                                    upper_bound = lower_bound;
                                }
                            }
                        }
                        Ok(Datum::Null) => {
                            null_eval = true;
                        }
                        x => {
                            panic!("Non-decimal value in temporal predicate: {:?}", x);
                        }
                    }
                }
            }

            // Produce an output only if the upper bound exceeds the lower bound,
            // and if we did not encounter a `null` in our evaluation.
            if lower_bound != upper_bound && !null_eval {
                row_builder.clear();
                row_builder.extend(self.mfp.mfp.projection.iter().map(|c| datums[*c]));
                let (lower_opt, upper_opt) = match (lower_bound, upper_bound) {
                    (Some(lower_bound), Some(upper_bound)) => (
                        Some(Ok((row_builder.clone(), lower_bound, diff))),
                        Some(Ok((row_builder.finish_and_reuse(), upper_bound, -diff))),
                    ),
                    (Some(lower_bound), None) => (
                        Some(Ok((row_builder.finish_and_reuse(), lower_bound, diff))),
                        None,
                    ),
                    (None, Some(upper_bound)) => (
                        None,
                        Some(Ok((row_builder.finish_and_reuse(), upper_bound, -diff))),
                    ),
                    _ => (None, None),
                };
                lower_opt.into_iter().chain(upper_opt.into_iter())
            } else {
                None.into_iter().chain(None.into_iter())
            }
        }
    }
}