fn suppress_early_progress<G, D>(
    stream: Stream<G, D>,
    as_of: Antichain<G::Timestamp>
) -> Stream<G, D>where
    G: Scope,
    D: Data,
Expand description

Suppress progress messages for times before the given as_of.

This operator exists specifically to work around a memory spike we’d otherwise see when hydrating arrangements (#21165). The memory spike happens because when the arrange_core operator observes a frontier advancement without data it inserts an empty batch into the spine. When it later inserts the snapshot batch into the spine, an empty batch is already there and the spine initiates a merge of these batches, which requires allocating a new batch the size of the snapshot batch.

The strategy to avoid the spike is to prevent the insertion of that initial empty batch by ensuring that the first frontier advancement downstream arrange_core operators observe is beyond the as_of, so the snapshot data has already been collected.

To ensure this, this operator needs to take two measures:

  • Keep around a minimum capability until the input announces progress beyond the as_of.
  • Reclock all updates emitted at times not beyond the as_of to the minimum time.

The second measure requires elaboration: If we wouldn’t reclock snapshot updates, they might still be upstream of arrange_core operators when those get to know about us dropping the minimum capability. The in-flight snapshot updates would hold back the input frontiers of arrange_core operators to the as_of, which would cause them to insert empty batches.