pub struct OperatorBuilder<G: Scope> {
builder: OperatorBuilder<G>,
activator: Activator,
operator_waker: Arc<TimelyWaker>,
input_frontiers: Vec<Antichain<G::Timestamp>>,
input_queues: Vec<Box<dyn InputQueue<G::Timestamp>>>,
output_flushes: Vec<Box<dyn FnMut()>>,
shutdown_handle: ButtonHandle,
shutdown_button: Button,
}
Expand description
Builds async operators with generic shape.
Fields§
§builder: OperatorBuilder<G>
§activator: Activator
The activator for this operator
operator_waker: Arc<TimelyWaker>
The waker set up to activate this timely operator when woken
input_frontiers: Vec<Antichain<G::Timestamp>>
The currently known upper frontier of each of the input handles.
input_queues: Vec<Box<dyn InputQueue<G::Timestamp>>>
Input queues for each of the declared inputs of the operator.
output_flushes: Vec<Box<dyn FnMut()>>
Holds type erased closures that flush an output handle when called. These handles will be automatically drained when the operator is scheduled after the logic future has been polled
shutdown_handle: ButtonHandle
A handle to check whether all workers have pressed the shutdown button.
A button to coordinate shutdown of this operator among workers.
Implementations§
Source§impl<G: Scope> OperatorBuilder<G>
impl<G: Scope> OperatorBuilder<G>
Sourcepub fn new(name: String, scope: G) -> Self
pub fn new(name: String, scope: G) -> Self
Allocates a new generic async operator builder from its containing scope.
Sourcepub fn new_input_for<D, P>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
output: &dyn OutputIndex,
) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>
pub fn new_input_for<D, P>( &mut self, stream: &StreamCore<G, D>, pact: P, output: &dyn OutputIndex, ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>
Adds a new input that is connected to the specified output, returning the async input handle to use.
Sourcepub fn new_input_for_many<const N: usize, D, P>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
outputs: [&dyn OutputIndex; N],
) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>
pub fn new_input_for_many<const N: usize, D, P>( &mut self, stream: &StreamCore<G, D>, pact: P, outputs: [&dyn OutputIndex; N], ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>
Adds a new input that is connected to the specified outputs, returning the async input handle to use.
Sourcepub fn new_disconnected_input<D, P>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
) -> AsyncInputHandle<G::Timestamp, D, Disconnected>
pub fn new_disconnected_input<D, P>( &mut self, stream: &StreamCore<G, D>, pact: P, ) -> AsyncInputHandle<G::Timestamp, D, Disconnected>
Adds a new input that is not connected to any output, returning the async input handle to use.
Sourcepub fn new_input_connection<D, P, C>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
connection: C,
) -> AsyncInputHandle<G::Timestamp, D, C>where
D: Container + 'static,
P: ParallelizationContract<G::Timestamp, D>,
C: InputConnection<G::Timestamp> + 'static,
pub fn new_input_connection<D, P, C>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
connection: C,
) -> AsyncInputHandle<G::Timestamp, D, C>where
D: Container + 'static,
P: ParallelizationContract<G::Timestamp, D>,
C: InputConnection<G::Timestamp> + 'static,
Adds a new input with connection information, returning the async input handle to use.
Sourcepub fn new_output<CB: ContainerBuilder>(
&mut self,
) -> (AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>)
pub fn new_output<CB: ContainerBuilder>( &mut self, ) -> (AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>)
Adds a new output, returning the output handle and stream.
Sourcepub fn build<B, L>(self, constructor: B) -> Button
pub fn build<B, L>(self, constructor: B) -> Button
Creates an operator implementation from supplied logic constructor. It returns a shutdown
button that when pressed it will cause the logic future to be dropped and input handles to
be drained. The button can be converted into a token by using
Button::press_on_drop
Sourcepub fn build_fallible<E: 'static, F>(
self,
constructor: F,
) -> (Button, StreamCore<G, Vec<Rc<E>>>)
pub fn build_fallible<E: 'static, F>( self, constructor: F, ) -> (Button, StreamCore<G, Vec<Rc<E>>>)
Creates a fallible operator implementation from supplied logic constructor. If the Future
resolves to an error it will be emitted in the returned error stream and then the operator
will wait indefinitely until the shutdown button is pressed.
§Capability handling
Unlike OperatorBuilder::build
, this method does not give owned capabilities to the
constructor. All initial capabilities are wrapped in a CapabilitySet
and a mutable
reference to them is given instead. This is done to avoid storing owned capabilities in the
state of the logic future which would make using the ?
operator unsafe, since the
frontiers would incorrectly advance, potentially causing incorrect actions downstream.
builder.build_fallible(|caps| Box::pin(async move {
// Assert that we have the number of capabilities we expect
// `cap` will be a `&mut Option<Capability<T>>`:
let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
// Using cap to send data:
output.give(&cap_set[0], 42);
// Using cap_set to downgrade it:
cap_set.downgrade([]);
// Explicitly dropping the capability:
// Simply running `drop(cap_set)` will only drop the reference and not the capability set itself!
*cap_set = CapabilitySet::new();
// !! BIG WARNING !!:
// It is tempting to `take` the capability out of the set for convenience. This will
// move the capability into the future state, tying its lifetime to it, which will get
// dropped when an error is hit, causing incorrect progress statements.
let cap = cap_set.delayed(&Timestamp::minimum());
*cap_set = CapabilitySet::new(); // DO NOT DO THIS
}));
Sourcepub fn operator_info(&self) -> OperatorInfo
pub fn operator_info(&self) -> OperatorInfo
Creates operator info for the operator.
Auto Trait Implementations§
impl<G> Freeze for OperatorBuilder<G>where
G: Freeze,
impl<G> !RefUnwindSafe for OperatorBuilder<G>
impl<G> !Send for OperatorBuilder<G>
impl<G> !Sync for OperatorBuilder<G>
impl<G> Unpin for OperatorBuilder<G>where
G: Unpin,
<G as ScopeParent>::Timestamp: Unpin,
<<G as ScopeParent>::Timestamp as Timestamp>::Summary: Unpin,
impl<G> !UnwindSafe for OperatorBuilder<G>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
Source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.