Struct mz_timely_util::builder_async::OperatorBuilder
source · pub struct OperatorBuilder<G: Scope> { /* private fields */ }
Expand description
Builds async operators with generic shape.
Implementations§
source§impl<G: Scope> OperatorBuilder<G>
impl<G: Scope> OperatorBuilder<G>
sourcepub fn new(name: String, scope: G) -> Self
pub fn new(name: String, scope: G) -> Self
Allocates a new generic async operator builder from its containing scope.
sourcepub fn new_input_for<D: Container, P>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
output: &dyn OutputIndex
) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>where
P: ParallelizationContract<G::Timestamp, D>,
pub fn new_input_for<D: Container, P>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
output: &dyn OutputIndex
) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>where
P: ParallelizationContract<G::Timestamp, D>,
Adds a new input that is connected to the specified output, returning the async input handle to use.
sourcepub fn new_input_for_many<const N: usize, D: Container, P>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
outputs: [&dyn OutputIndex; N]
) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>where
P: ParallelizationContract<G::Timestamp, D>,
pub fn new_input_for_many<const N: usize, D: Container, P>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
outputs: [&dyn OutputIndex; N]
) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>where
P: ParallelizationContract<G::Timestamp, D>,
Adds a new input that is connected to the specified outputs, returning the async input handle to use.
sourcepub fn new_disconnected_input<D: Container, P>(
&mut self,
stream: &StreamCore<G, D>,
pact: P
) -> AsyncInputHandle<G::Timestamp, D, Disconnected>where
P: ParallelizationContract<G::Timestamp, D>,
pub fn new_disconnected_input<D: Container, P>(
&mut self,
stream: &StreamCore<G, D>,
pact: P
) -> AsyncInputHandle<G::Timestamp, D, Disconnected>where
P: ParallelizationContract<G::Timestamp, D>,
Adds a new input that is not connected to any output, returning the async input handle to use.
sourcepub fn new_input_connection<D: Container, P, C>(
&mut self,
stream: &StreamCore<G, D>,
pact: P,
connection: C
) -> AsyncInputHandle<G::Timestamp, D, C>
pub fn new_input_connection<D: Container, P, C>( &mut self, stream: &StreamCore<G, D>, pact: P, connection: C ) -> AsyncInputHandle<G::Timestamp, D, C>
Adds a new input with connection information, returning the async input handle to use.
sourcepub fn new_output<D: Container>(
&mut self
) -> (AsyncOutputHandle<G::Timestamp, D, Tee<G::Timestamp, D>>, StreamCore<G, D>)
pub fn new_output<D: Container>( &mut self ) -> (AsyncOutputHandle<G::Timestamp, D, Tee<G::Timestamp, D>>, StreamCore<G, D>)
Adds a new output, returning the output handle and stream.
sourcepub fn build<B, L>(self, constructor: B) -> Button
pub fn build<B, L>(self, constructor: B) -> Button
Creates an operator implementation from supplied logic constructor. It returns a shutdown
button that when pressed it will cause the logic future to be dropped and input handles to
be drained. The button can be converted into a token by using
Button::press_on_drop
sourcepub fn build_fallible<E: 'static, F>(
self,
constructor: F
) -> (Button, StreamCore<G, Vec<Rc<E>>>)
pub fn build_fallible<E: 'static, F>( self, constructor: F ) -> (Button, StreamCore<G, Vec<Rc<E>>>)
Creates a fallible operator implementation from supplied logic constructor. If the Future
resolves to an error it will be emitted in the returned error stream and then the operator
will wait indefinitely until the shutdown button is pressed.
§Capability handling
Unlike OperatorBuilder::build
, this method does not give owned capabilities to the
constructor. All initial capabilities are wrapped in a CapabilitySet
and a mutable
reference to them is given instead. This is done to avoid storing owned capabilities in the
state of the logic future which would make using the ?
operator unsafe, since the
frontiers would incorrectly advance, potentially causing incorrect actions downstream.
builder.build_fallible(|caps| Box::pin(async move {
// Assert that we have the number of capabilities we expect
// `cap` will be a `&mut Option<Capability<T>>`:
let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();
// Using cap to send data:
output.give(&cap_set[0], 42);
// Using cap_set to downgrade it:
cap_set.downgrade([]);
// Explicitly dropping the capability:
// Simply running `drop(cap_set)` will only drop the reference and not the capability set itself!
*cap_set = CapabilitySet::new();
// !! BIG WARNING !!:
// It is tempting to `take` the capability out of the set for convenience. This will
// move the capability into the future state, tying its lifetime to it, which will get
// dropped when an error is hit, causing incorrect progress statements.
let cap = cap_set.delayed(&Timestamp::minimum());
*cap_set = CapabilitySet::new(); // DO NOT DO THIS
}));
sourcepub fn operator_info(&self) -> OperatorInfo
pub fn operator_info(&self) -> OperatorInfo
Creates operator info for the operator.
Auto Trait Implementations§
impl<G> Freeze for OperatorBuilder<G>where
G: Freeze,
impl<G> !RefUnwindSafe for OperatorBuilder<G>
impl<G> !Send for OperatorBuilder<G>
impl<G> !Sync for OperatorBuilder<G>
impl<G> Unpin for OperatorBuilder<G>where
G: Unpin,
<G as ScopeParent>::Timestamp: Unpin,
<<G as ScopeParent>::Timestamp as Timestamp>::Summary: Unpin,
impl<G> !UnwindSafe for OperatorBuilder<G>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<R, O, T> CopyOnto<ConsecutiveOffsetPairs<R, O>> for T
impl<R, O, T> CopyOnto<ConsecutiveOffsetPairs<R, O>> for T
source§fn copy_onto(
self,
target: &mut ConsecutiveOffsetPairs<R, O>
) -> <ConsecutiveOffsetPairs<R, O> as Region>::Index
fn copy_onto( self, target: &mut ConsecutiveOffsetPairs<R, O> ) -> <ConsecutiveOffsetPairs<R, O> as Region>::Index
source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request