Function mz_compute::render::suppress_early_progress
source · fn suppress_early_progress<G, D>(
stream: Stream<G, D>,
as_of: Antichain<G::Timestamp>,
) -> Stream<G, D>
Expand description
Suppress progress messages for times before the given as_of
.
This operator exists specifically to work around a memory spike we’d otherwise see when
hydrating arrangements (database-issues#6368). The memory spike happens because when the arrange_core
operator observes a frontier advancement without data it inserts an empty batch into the spine.
When it later inserts the snapshot batch into the spine, an empty batch is already there and
the spine initiates a merge of these batches, which requires allocating a new batch the size of
the snapshot batch.
The strategy to avoid the spike is to prevent the insertion of that initial empty batch by
ensuring that the first frontier advancement downstream arrange_core
operators observe is
beyond the as_of
, so the snapshot data has already been collected.
To ensure this, this operator needs to take two measures:
- Keep around a minimum capability until the input announces progress beyond the
as_of
. - Reclock all updates emitted at times not beyond the
as_of
to the minimum time.
The second measure requires elaboration: If we wouldn’t reclock snapshot updates, they might
still be upstream of arrange_core
operators when those get to know about us dropping the
minimum capability. The in-flight snapshot updates would hold back the input frontiers of
arrange_core
operators to the as_of
, which would cause them to insert empty batches.