pub struct OperatorBuilder<G: Scope> {
builder: OperatorBuilder<G>,
activator: Activator,
operator_waker: Arc<TimelyWaker>,
input_frontiers: Vec<Antichain<G::Timestamp>>,
input_queues: Vec<Box<dyn InputQueue<G::Timestamp>>>,
output_flushes: Vec<Box<dyn FnMut()>>,
shutdown_handle: ButtonHandle,
shutdown_button: Button,
}
Expand description
Builds async operators with generic shape.
Fields§
§builder: OperatorBuilder<G>
§activator: Activator
The activator for this operator
operator_waker: Arc<TimelyWaker>
The waker set up to activate this timely operator when woken
input_frontiers: Vec<Antichain<G::Timestamp>>
The currently known upper frontier of each of the input handles.
input_queues: Vec<Box<dyn InputQueue<G::Timestamp>>>
Input queues for each of the declared inputs of the operator.
output_flushes: Vec<Box<dyn FnMut()>>
Holds type erased closures that flush an output handle when called. These handles will be automatically drained when the operator is scheduled after the logic future has been polled
shutdown_handle: ButtonHandle
A handle to check whether all workers have pressed the shutdown button.
A button to coordinate shutdown of this operator among workers.
Implementations§
Source§impl<G: Scope> OperatorBuilder<G>
impl<G: Scope> OperatorBuilder<G>
Sourcepub fn new(name: String, scope: G) -> Self
pub fn new(name: String, scope: G) -> Self
Allocates a new generic async operator builder from its containing scope.
Sourcepub fn new_input_for<D, P>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
output: &dyn OutputIndex,
) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>
pub fn new_input_for<D, P>( &mut self, stream: &StreamCore<G, D>, pact: P, output: &dyn OutputIndex, ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>
Adds a new input that is connected to the specified output, returning the async input handle to use.
Sourcepub fn new_input_for_many<const N: usize, D, P>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
outputs: [&dyn OutputIndex; N],
) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>
pub fn new_input_for_many<const N: usize, D, P>( &mut self, stream: &StreamCore<G, D>, pact: P, outputs: [&dyn OutputIndex; N], ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>
Adds a new input that is connected to the specified outputs, returning the async input handle to use.
Sourcepub fn new_disconnected_input<D, P>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
) -> AsyncInputHandle<G::Timestamp, D, Disconnected>
pub fn new_disconnected_input<D, P>( &mut self, stream: &StreamCore<G, D>, pact: P, ) -> AsyncInputHandle<G::Timestamp, D, Disconnected>
Adds a new input that is not connected to any output, returning the async input handle to use.
Sourcepub fn new_input_connection<D, P, C>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
connection: C,
) -> AsyncInputHandle<G::Timestamp, D, C>where
D: Container + 'static,
P: ParallelizationContract<G::Timestamp, D>,
C: InputConnection<G::Timestamp> + 'static,
pub fn new_input_connection<D, P, C>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
connection: C,
) -> AsyncInputHandle<G::Timestamp, D, C>where
D: Container + 'static,
P: ParallelizationContract<G::Timestamp, D>,
C: InputConnection<G::Timestamp> + 'static,
Adds a new input with connection information, returning the async input handle to use.
Sourcepub fn new_output<CB: ContainerBuilder>(
&mut self,
) -> (AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>)
pub fn new_output<CB: ContainerBuilder>( &mut self, ) -> (AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>)
Adds a new output, returning the output handle and stream.
Sourcepub fn build<B, L>(self, constructor: B) -> Button
pub fn build<B, L>(self, constructor: B) -> Button
Creates an operator implementation from supplied logic constructor. It returns a shutdown
button that when pressed it will cause the logic future to be dropped and input handles to
be drained. The button can be converted into a token by using
Button::press_on_drop
Sourcepub fn build_fallible<E: 'static, F>(
self,
constructor: F,
) -> (Button, StreamCore<G, Vec<Rc<E>>>)
pub fn build_fallible<E: 'static, F>( self, constructor: F, ) -> (Button, StreamCore<G, Vec<Rc<E>>>)
Creates a fallible operator implementation from supplied logic constructor. If the Future
resolves to an error it will be emitted in the returned error stream and then the operator
will wait indefinitely until the shutdown button is pressed.
§Capability handling
Unlike OperatorBuilder::build
, this method does not give owned capabilities to the
constructor. All initial capabilities are wrapped in a CapabilitySet
and a mutable
reference to them is given instead. This is done to avoid storing owned capabilities in the
state of the logic future which would make using the ?
operator unsafe, since the
frontiers would incorrectly advance, potentially causing incorrect actions downstream.
builder.build_fallible(|caps| Box::pin(async move {
// Assert that we have the number of capabilities we expect
// `cap` will be a `&mut Option<Capability<T>>`:
let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
// Using cap to send data:
output.give(&cap_set[0], 42);
// Using cap_set to downgrade it:
cap_set.downgrade([]);
// Explicitly dropping the capability:
// Simply running `drop(cap_set)` will only drop the reference and not the capability set itself!
*cap_set = CapabilitySet::new();
// !! BIG WARNING !!:
// It is tempting to `take` the capability out of the set for convenience. This will
// move the capability into the future state, tying its lifetime to it, which will get
// dropped when an error is hit, causing incorrect progress statements.
let cap = cap_set.delayed(&Timestamp::minimum());
*cap_set = CapabilitySet::new(); // DO NOT DO THIS
}));
Sourcepub fn operator_info(&self) -> OperatorInfo
pub fn operator_info(&self) -> OperatorInfo
Creates operator info for the operator.
Auto Trait Implementations§
impl<G> Freeze for OperatorBuilder<G>where
G: Freeze,
impl<G> !RefUnwindSafe for OperatorBuilder<G>
impl<G> !Send for OperatorBuilder<G>
impl<G> !Sync for OperatorBuilder<G>
impl<G> Unpin for OperatorBuilder<G>
impl<G> !UnwindSafe for OperatorBuilder<G>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
Source§impl<T> Paint for Twhere
T: ?Sized,
impl<T> Paint for Twhere
T: ?Sized,
Source§fn fg(&self, value: Color) -> Painted<&T>
fn fg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self
with the foreground set to
value
.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like red()
and
green()
, which have the same functionality but are
pithier.
§Example
Set foreground color to white using fg()
:
use yansi::{Paint, Color};
painted.fg(Color::White);
Set foreground color to white using white()
.
use yansi::Paint;
painted.white();
Source§fn bright_black(&self) -> Painted<&T>
fn bright_black(&self) -> Painted<&T>
Source§fn bright_red(&self) -> Painted<&T>
fn bright_red(&self) -> Painted<&T>
Source§fn bright_green(&self) -> Painted<&T>
fn bright_green(&self) -> Painted<&T>
Source§fn bright_yellow(&self) -> Painted<&T>
fn bright_yellow(&self) -> Painted<&T>
Source§fn bright_blue(&self) -> Painted<&T>
fn bright_blue(&self) -> Painted<&T>
Source§fn bright_magenta(&self) -> Painted<&T>
fn bright_magenta(&self) -> Painted<&T>
Source§fn bright_cyan(&self) -> Painted<&T>
fn bright_cyan(&self) -> Painted<&T>
Source§fn bright_white(&self) -> Painted<&T>
fn bright_white(&self) -> Painted<&T>
Source§fn bg(&self, value: Color) -> Painted<&T>
fn bg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self
with the background set to
value
.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like on_red()
and
on_green()
, which have the same functionality but
are pithier.
§Example
Set background color to red using fg()
:
use yansi::{Paint, Color};
painted.bg(Color::Red);
Set background color to red using on_red()
.
use yansi::Paint;
painted.on_red();
Source§fn on_primary(&self) -> Painted<&T>
fn on_primary(&self) -> Painted<&T>
Source§fn on_magenta(&self) -> Painted<&T>
fn on_magenta(&self) -> Painted<&T>
Source§fn on_bright_black(&self) -> Painted<&T>
fn on_bright_black(&self) -> Painted<&T>
Source§fn on_bright_red(&self) -> Painted<&T>
fn on_bright_red(&self) -> Painted<&T>
Source§fn on_bright_green(&self) -> Painted<&T>
fn on_bright_green(&self) -> Painted<&T>
Source§fn on_bright_yellow(&self) -> Painted<&T>
fn on_bright_yellow(&self) -> Painted<&T>
Source§fn on_bright_blue(&self) -> Painted<&T>
fn on_bright_blue(&self) -> Painted<&T>
Source§fn on_bright_magenta(&self) -> Painted<&T>
fn on_bright_magenta(&self) -> Painted<&T>
Source§fn on_bright_cyan(&self) -> Painted<&T>
fn on_bright_cyan(&self) -> Painted<&T>
Source§fn on_bright_white(&self) -> Painted<&T>
fn on_bright_white(&self) -> Painted<&T>
Source§fn attr(&self, value: Attribute) -> Painted<&T>
fn attr(&self, value: Attribute) -> Painted<&T>
Enables the styling Attribute
value
.
This method should be used rarely. Instead, prefer to use
attribute-specific builder methods like bold()
and
underline()
, which have the same functionality
but are pithier.
§Example
Make text bold using attr()
:
use yansi::{Paint, Attribute};
painted.attr(Attribute::Bold);
Make text bold using using bold()
.
use yansi::Paint;
painted.bold();
Source§fn rapid_blink(&self) -> Painted<&T>
fn rapid_blink(&self) -> Painted<&T>
Source§fn quirk(&self, value: Quirk) -> Painted<&T>
fn quirk(&self, value: Quirk) -> Painted<&T>
Enables the yansi
Quirk
value
.
This method should be used rarely. Instead, prefer to use quirk-specific
builder methods like mask()
and
wrap()
, which have the same functionality but are
pithier.
§Example
Enable wrapping using .quirk()
:
use yansi::{Paint, Quirk};
painted.quirk(Quirk::Wrap);
Enable wrapping using wrap()
.
use yansi::Paint;
painted.wrap();
Source§fn clear(&self) -> Painted<&T>
👎Deprecated since 1.0.1: renamed to resetting()
due to conflicts with Vec::clear()
.
The clear()
method will be removed in a future release.
fn clear(&self) -> Painted<&T>
resetting()
due to conflicts with Vec::clear()
.
The clear()
method will be removed in a future release.Source§fn whenever(&self, value: Condition) -> Painted<&T>
fn whenever(&self, value: Condition) -> Painted<&T>
Conditionally enable styling based on whether the Condition
value
applies. Replaces any previous condition.
See the crate level docs for more details.
§Example
Enable styling painted
only when both stdout
and stderr
are TTYs:
use yansi::{Paint, Condition};
painted.red().on_yellow().whenever(Condition::STDOUTERR_ARE_TTY);
Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
Source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
Source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.