pub struct ColumnMergeBatcher<D, T, R>{
chains: Vec<VecDeque<PagedColumn<(D, T, R)>>>,
lower: Antichain<T>,
frontier: Antichain<T>,
stash: Vec<Column<(D, T, R)>>,
pager_override: Option<ColumnPager>,
logger: Option<Logger>,
operator_id: usize,
}Expand description
Drives the merge-batcher over Column chunks routed through a
ColumnPager.
Chains hold PagedColumn entries rather than resident Columns, so
each insert / merge / extract step can hand its output to the pager and
store whatever the policy returns (resident, paged, or compressed). Reads
during merge materialize lazily via FetchIter.
Resolves its pager lazily per call via column_pager::global_pager, so
late-arriving dyncfg updates (e.g. enable_column_paged_batcher flipping
on after the batcher was constructed) take effect without rebuilding the
operator. Tests may override that lookup via Self::set_pager.
Fields§
§chains: Vec<VecDeque<PagedColumn<(D, T, R)>>>§lower: Antichain<T>§frontier: Antichain<T>§stash: Vec<Column<(D, T, R)>>Recycled empty Column::Typed chunks. Drained heads and shipped result
buffers feed in here; subsequent merge / extract calls pop from here
instead of starting from a zero-capacity Column::default(). Mirrors
the stash carried by the upstream differential_dataflow merge-batcher
framework, which this type forks. Without it, each shipped chunk
triggers a fresh per-leaf grow cycle and per-merge-round allocation
dominates the inner loop.
pager_override: Option<ColumnPager>Optional override. None means “read column_pager::global_pager
fresh on every use” — the production path, so worker_config dyncfg
changes that re-install the process-global pager take effect on the
very next chunk this batcher processes.
logger: Option<Logger>§operator_id: usizeImplementations§
Source§impl<D, T, R> ColumnMergeBatcher<D, T, R>
impl<D, T, R> ColumnMergeBatcher<D, T, R>
Sourcepub fn set_pager(&mut self, pager: ColumnPager)
pub fn set_pager(&mut self, pager: ColumnPager)
Pin the pager this batcher uses, overriding the thread-local lookup. Mainly for tests; production should leave the override unset so dyncfg-driven re-installs take effect immediately.
Sourcefn pager(&self) -> ColumnPager
fn pager(&self) -> ColumnPager
Current pager — override if set, else the process-global pager
installed by apply_worker_config. ColumnPager is cheaply
cloneable (Arc inside).
Sourcefn chain_push(&mut self, chain: VecDeque<PagedColumn<(D, T, R)>>)
fn chain_push(&mut self, chain: VecDeque<PagedColumn<(D, T, R)>>)
Push a chain into self.chains, emitting a positive BatcherEvent
covering its resident entries.
Sourcefn chain_pop(&mut self) -> Option<VecDeque<PagedColumn<(D, T, R)>>>
fn chain_pop(&mut self) -> Option<VecDeque<PagedColumn<(D, T, R)>>>
Pop a chain from self.chains, emitting a negative BatcherEvent
retracting its resident entries.
Invariant for the retract to reconcile against the matching
chain_push: chain entries are never mutated in place between push
and pop. The only allowed mutation is a full pop / push pair (see
insert_chain and merge_by), so each entry’s accounting category
— Resident vs Paged vs Compressed — is the same at both ends.
If a future change ever pages an entry out in place after push, this
path silently double-counts.
Sourcefn emit_account(&self, chain: &VecDeque<PagedColumn<(D, T, R)>>, diff: isize)
fn emit_account(&self, chain: &VecDeque<PagedColumn<(D, T, R)>>, diff: isize)
Emit a single BatcherEvent summing resident accounting across
chain with the given sign. No-op when no logger is attached.
Source§impl<D, T, R> ColumnMergeBatcher<D, T, R>
impl<D, T, R> ColumnMergeBatcher<D, T, R>
Sourcefn insert_chain(&mut self, chain: VecDeque<PagedColumn<(D, T, R)>>)
fn insert_chain(&mut self, chain: VecDeque<PagedColumn<(D, T, R)>>)
Insert chain and rebalance: while the youngest chain is at least
half the size of its predecessor, merge them.
Sourcefn merge_by(
&mut self,
a: VecDeque<PagedColumn<(D, T, R)>>,
b: VecDeque<PagedColumn<(D, T, R)>>,
) -> VecDeque<PagedColumn<(D, T, R)>>
fn merge_by( &mut self, a: VecDeque<PagedColumn<(D, T, R)>>, b: VecDeque<PagedColumn<(D, T, R)>>, ) -> VecDeque<PagedColumn<(D, T, R)>>
Merge two sorted chains. Outputs are routed through self.pager.page
per chunk produced, so the result chain holds PagedColumns and the
caller never sees a fully materialized merge result.
Trait Implementations§
Source§impl<D, T, R> Batcher for ColumnMergeBatcher<D, T, R>
impl<D, T, R> Batcher for ColumnMergeBatcher<D, T, R>
Source§fn seal(
&mut self,
upper: Antichain<Self::Time>,
) -> (Vec<Self::Output>, Description<Self::Time>)
fn seal( &mut self, upper: Antichain<Self::Time>, ) -> (Vec<Self::Output>, Description<Self::Time>)
upper, as a sorted and
consolidated chain together with the description that bounds them. Read moreSource§fn frontier(&mut self) -> AntichainRef<'_, Self::Time>
fn frontier(&mut self) -> AntichainRef<'_, Self::Time>
Source§impl<D, T, R> Drop for ColumnMergeBatcher<D, T, R>
impl<D, T, R> Drop for ColumnMergeBatcher<D, T, R>
Auto Trait Implementations§
impl<D, T, R> Freeze for ColumnMergeBatcher<D, T, R>where
T: Freeze,
impl<D, T, R> !RefUnwindSafe for ColumnMergeBatcher<D, T, R>
impl<D, T, R> !Send for ColumnMergeBatcher<D, T, R>
impl<D, T, R> !Sync for ColumnMergeBatcher<D, T, R>
impl<D, T, R> Unpin for ColumnMergeBatcher<D, T, R>
impl<D, T, R> UnsafeUnpin for ColumnMergeBatcher<D, T, R>where
T: UnsafeUnpin,
impl<D, T, R> !UnwindSafe for ColumnMergeBatcher<D, T, R>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::RequestSource§impl<T> Paint for Twhere
T: ?Sized,
impl<T> Paint for Twhere
T: ?Sized,
Source§fn fg(&self, value: Color) -> Painted<&T>
fn fg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self with the foreground set to
value.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like red() and
green(), which have the same functionality but are
pithier.
§Example
Set foreground color to white using fg():
use yansi::{Paint, Color};
painted.fg(Color::White);Set foreground color to white using white().
use yansi::Paint;
painted.white();Source§fn bright_black(&self) -> Painted<&T>
fn bright_black(&self) -> Painted<&T>
Source§fn bright_red(&self) -> Painted<&T>
fn bright_red(&self) -> Painted<&T>
Source§fn bright_green(&self) -> Painted<&T>
fn bright_green(&self) -> Painted<&T>
Source§fn bright_yellow(&self) -> Painted<&T>
fn bright_yellow(&self) -> Painted<&T>
Source§fn bright_blue(&self) -> Painted<&T>
fn bright_blue(&self) -> Painted<&T>
Source§fn bright_magenta(&self) -> Painted<&T>
fn bright_magenta(&self) -> Painted<&T>
Source§fn bright_cyan(&self) -> Painted<&T>
fn bright_cyan(&self) -> Painted<&T>
Source§fn bright_white(&self) -> Painted<&T>
fn bright_white(&self) -> Painted<&T>
Source§fn bg(&self, value: Color) -> Painted<&T>
fn bg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self with the background set to
value.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like on_red() and
on_green(), which have the same functionality but
are pithier.
§Example
Set background color to red using fg():
use yansi::{Paint, Color};
painted.bg(Color::Red);Set background color to red using on_red().
use yansi::Paint;
painted.on_red();Source§fn on_primary(&self) -> Painted<&T>
fn on_primary(&self) -> Painted<&T>
Source§fn on_magenta(&self) -> Painted<&T>
fn on_magenta(&self) -> Painted<&T>
Source§fn on_bright_black(&self) -> Painted<&T>
fn on_bright_black(&self) -> Painted<&T>
Source§fn on_bright_red(&self) -> Painted<&T>
fn on_bright_red(&self) -> Painted<&T>
Source§fn on_bright_green(&self) -> Painted<&T>
fn on_bright_green(&self) -> Painted<&T>
Source§fn on_bright_yellow(&self) -> Painted<&T>
fn on_bright_yellow(&self) -> Painted<&T>
Source§fn on_bright_blue(&self) -> Painted<&T>
fn on_bright_blue(&self) -> Painted<&T>
Source§fn on_bright_magenta(&self) -> Painted<&T>
fn on_bright_magenta(&self) -> Painted<&T>
Source§fn on_bright_cyan(&self) -> Painted<&T>
fn on_bright_cyan(&self) -> Painted<&T>
Source§fn on_bright_white(&self) -> Painted<&T>
fn on_bright_white(&self) -> Painted<&T>
Source§fn attr(&self, value: Attribute) -> Painted<&T>
fn attr(&self, value: Attribute) -> Painted<&T>
Enables the styling Attribute value.
This method should be used rarely. Instead, prefer to use
attribute-specific builder methods like bold() and
underline(), which have the same functionality
but are pithier.
§Example
Make text bold using attr():
use yansi::{Paint, Attribute};
painted.attr(Attribute::Bold);Make text bold using using bold().
use yansi::Paint;
painted.bold();Source§fn rapid_blink(&self) -> Painted<&T>
fn rapid_blink(&self) -> Painted<&T>
Source§fn quirk(&self, value: Quirk) -> Painted<&T>
fn quirk(&self, value: Quirk) -> Painted<&T>
Enables the yansi Quirk value.
This method should be used rarely. Instead, prefer to use quirk-specific
builder methods like mask() and
wrap(), which have the same functionality but are
pithier.
§Example
Enable wrapping using .quirk():
use yansi::{Paint, Quirk};
painted.quirk(Quirk::Wrap);Enable wrapping using wrap().
use yansi::Paint;
painted.wrap();Source§fn clear(&self) -> Painted<&T>
👎Deprecated since 1.0.1: renamed to resetting() due to conflicts with Vec::clear().
The clear() method will be removed in a future release.
fn clear(&self) -> Painted<&T>
renamed to resetting() due to conflicts with Vec::clear().
The clear() method will be removed in a future release.
Source§fn whenever(&self, value: Condition) -> Painted<&T>
fn whenever(&self, value: Condition) -> Painted<&T>
Conditionally enable styling based on whether the Condition value
applies. Replaces any previous condition.
See the crate level docs for more details.
§Example
Enable styling painted only when both stdout and stderr are TTYs:
use yansi::{Paint, Condition};
painted.red().on_yellow().whenever(Condition::STDOUTERR_ARE_TTY);Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
Source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
Source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign, for types that do not implement AddAssign.