Expand description

Tries to convert a reduce around a join to a join of reduces. Also absorbs Map operators into Reduce operators.

In a traditional DB, this transformation has a potential benefit of reducing the size of the join. In our streaming system built on top of Timely Dataflow and Differential Dataflow, there are two other potential benefits:

  1. Reducing data skew in the arrangements constructed for a join.
  2. The join can potentially reuse the final arrangement constructed for the reduce and not have to construct its own arrangement.
  3. Reducing the frequency with which we have to recalculate the result of a join.

Suppose there are two inputs R and S being joined. According to Galindo-Legaria and Joshi (2001), a full reduction pushdown to R can be done if and only if:

  1. Columns from R involved in join constraints are a subset of the group by keys.
  2. The key of S is a subset of the group by keys.
  3. The columns involved in the aggregation all belong to R.

In our current implementation:

  • We abide by condition 1 to the letter.
  • We work around condition 2 by rewriting the reduce around a join of R to S with an equivalent relational expression involving a join of R to
    select <columns involved in join constraints>, count(true)
    from S
    group by <columns involved in join constraints>
  • TODO: We work around condition 3 in some cases by noting that sum(R.a * S.a) is equivalent to sum(R.a) * sum(S.a).

Full documentation with examples can be found here

The current implementation is chosen so that reduction pushdown kicks in only in the subset of cases mostly likely to help users. In the future, we may allow the user to toggle the aggressiveness of reduction pushdown. A more aggressive reduction pushdown implementation may, for example, try to work around condition 1 by pushing down an inner reduce through the join while retaining the original outer reduce.


Pushes Reduce operators toward sources.