Struct mz_timely_util::builder_async::OperatorBuilder

source ·
pub struct OperatorBuilder<G: Scope> {
    builder: OperatorBuilder<G>,
    activator: Activator,
    operator_waker: Arc<TimelyWaker>,
    input_frontiers: Vec<Antichain<G::Timestamp>>,
    input_queues: Vec<Box<dyn InputQueue<G::Timestamp>>>,
    output_flushes: Vec<Box<dyn FnMut()>>,
    shutdown_handle: ButtonHandle,
    shutdown_button: Button,
}
Expand description

Builds async operators with generic shape.

Fields§

§builder: OperatorBuilder<G>§activator: Activator

The activator for this operator

§operator_waker: Arc<TimelyWaker>

The waker set up to activate this timely operator when woken

§input_frontiers: Vec<Antichain<G::Timestamp>>

The currently known upper frontier of each of the input handles.

§input_queues: Vec<Box<dyn InputQueue<G::Timestamp>>>

Input queues for each of the declared inputs of the operator.

§output_flushes: Vec<Box<dyn FnMut()>>

Holds type erased closures that flush an output handle when called. These handles will be automatically drained when the operator is scheduled after the logic future has been polled

§shutdown_handle: ButtonHandle

A handle to check whether all workers have pressed the shutdown button.

§shutdown_button: Button

A button to coordinate shutdown of this operator among workers.

Implementations§

source§

impl<G: Scope> OperatorBuilder<G>

source

pub fn new(name: String, scope: G) -> Self

Allocates a new generic async operator builder from its containing scope.

source

pub fn new_input_for<D, P>( &mut self, stream: &StreamCore<G, D>, pact: P, output: &dyn OutputIndex, ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToOne>
where D: Container + 'static, P: ParallelizationContract<G::Timestamp, D>,

Adds a new input that is connected to the specified output, returning the async input handle to use.

source

pub fn new_input_for_many<const N: usize, D, P>( &mut self, stream: &StreamCore<G, D>, pact: P, outputs: [&dyn OutputIndex; N], ) -> AsyncInputHandle<G::Timestamp, D, ConnectedToMany<N>>
where D: Container + 'static, P: ParallelizationContract<G::Timestamp, D>,

Adds a new input that is connected to the specified outputs, returning the async input handle to use.

source

pub fn new_disconnected_input<D, P>( &mut self, stream: &StreamCore<G, D>, pact: P, ) -> AsyncInputHandle<G::Timestamp, D, Disconnected>
where D: Container + 'static, P: ParallelizationContract<G::Timestamp, D>,

Adds a new input that is not connected to any output, returning the async input handle to use.

source

pub fn new_input_connection<D, P, C>( &mut self, stream: &StreamCore<G, D>, pact: P, connection: C, ) -> AsyncInputHandle<G::Timestamp, D, C>
where D: Container + 'static, P: ParallelizationContract<G::Timestamp, D>, C: InputConnection<G::Timestamp> + 'static,

Adds a new input with connection information, returning the async input handle to use.

source

pub fn new_output<CB: ContainerBuilder>( &mut self, ) -> (AsyncOutputHandle<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>)

Adds a new output, returning the output handle and stream.

source

pub fn build<B, L>(self, constructor: B) -> Button
where B: FnOnce(Vec<Capability<G::Timestamp>>) -> L, L: Future + 'static,

Creates an operator implementation from supplied logic constructor. It returns a shutdown button that when pressed it will cause the logic future to be dropped and input handles to be drained. The button can be converted into a token by using Button::press_on_drop

source

pub fn build_fallible<E: 'static, F>( self, constructor: F, ) -> (Button, StreamCore<G, Vec<Rc<E>>>)
where F: for<'a> FnOnce(&'a mut [CapabilitySet<G::Timestamp>]) -> Pin<Box<dyn Future<Output = Result<(), E>> + 'a>> + 'static,

Creates a fallible operator implementation from supplied logic constructor. If the Future resolves to an error it will be emitted in the returned error stream and then the operator will wait indefinitely until the shutdown button is pressed.

§Capability handling

Unlike OperatorBuilder::build, this method does not give owned capabilities to the constructor. All initial capabilities are wrapped in a CapabilitySet and a mutable reference to them is given instead. This is done to avoid storing owned capabilities in the state of the logic future which would make using the ? operator unsafe, since the frontiers would incorrectly advance, potentially causing incorrect actions downstream.

builder.build_fallible(|caps| Box::pin(async move {
    // Assert that we have the number of capabilities we expect
    // `cap` will be a `&mut Option<Capability<T>>`:
    let [cap_set]: &mut [_; 1] = caps.try_into().unwrap();

    // Using cap to send data:
    output.give(&cap_set[0], 42);

    // Using cap_set to downgrade it:
    cap_set.downgrade([]);

    // Explicitly dropping the capability:
    // Simply running `drop(cap_set)` will only drop the reference and not the capability set itself!
    *cap_set = CapabilitySet::new();

    // !! BIG WARNING !!:
    // It is tempting to `take` the capability out of the set for convenience. This will
    // move the capability into the future state, tying its lifetime to it, which will get
    // dropped when an error is hit, causing incorrect progress statements.
    let cap = cap_set.delayed(&Timestamp::minimum());
    *cap_set = CapabilitySet::new(); // DO NOT DO THIS
}));
source

pub fn operator_info(&self) -> OperatorInfo

Creates operator info for the operator.

source

pub fn activator(&self) -> &Activator

Returns the activator for the operator.

Auto Trait Implementations§

§

impl<G> Freeze for OperatorBuilder<G>
where G: Freeze,

§

impl<G> !RefUnwindSafe for OperatorBuilder<G>

§

impl<G> !Send for OperatorBuilder<G>

§

impl<G> !Sync for OperatorBuilder<G>

§

impl<G> Unpin for OperatorBuilder<G>

§

impl<G> !UnwindSafe for OperatorBuilder<G>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> CopyAs<T> for T

source§

fn copy_as(self) -> T

source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more