Struct mz_compute::render::context::Context

source ·
pub struct Context<S: Scope, T = Timestamp>{
    pub(crate) scope: S,
    pub debug_name: String,
    pub dataflow_id: usize,
    pub as_of_frontier: Antichain<T>,
    pub until: Antichain<T>,
    pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
    pub(super) shutdown_token: ShutdownToken,
    pub(super) hydration_logger: Option<HydrationLogger>,
    pub(super) compute_logger: Option<Logger<ComputeEvent, WorkerIdentifier>>,
    pub(super) linear_join_spec: LinearJoinSpec,
    pub dataflow_expiration: Antichain<T>,
}
Expand description

Dataflow-local collections and arrangements.

A context means to wrap available data assets and present them in an easy-to-use manner. These assets include dataflow-local collections and arrangements, as well as imported arrangements from outside the dataflow.

Context has two timestamp types, one from S::Timestamp and one from T, where the former must refine the latter. The former is the timestamp used by the scope in question, and the latter is the timestamp of imported traces. The two may be different in the case of regions or iteration.

Fields§

§scope: S

The scope within which all managed collections exist.

It is an error to add any collections not contained in this scope.

§debug_name: String

The debug name of the dataflow associated with this context.

§dataflow_id: usize

The Timely ID of the dataflow associated with this context.

§as_of_frontier: Antichain<T>

Frontier before which updates should not be emitted.

We must apply it to sinks, to ensure correct outputs. We should apply it to sources and imported traces, because it improves performance.

§until: Antichain<T>

Frontier after which updates should not be emitted. Used to limit the amount of work done when appropriate.

§bindings: BTreeMap<Id, CollectionBundle<S, T>>

Bindings of identifiers to collections.

§shutdown_token: ShutdownToken

A token that operators can probe to know whether the dataflow is shutting down.

§hydration_logger: Option<HydrationLogger>

A logger that operators can use to report hydration events.

None if no hydration events should be logged in this context.

§compute_logger: Option<Logger<ComputeEvent, WorkerIdentifier>>

The logger, from Timely’s logging framework, if logs are enabled.

§linear_join_spec: LinearJoinSpec

Specification for rendering linear joins.

§dataflow_expiration: Antichain<T>

The expiration time for dataflows in this context. The output’s frontier should never advance past this frontier, except the empty frontier.

Implementations§

source§

impl<S: Scope> Context<S>

source

pub fn for_dataflow_in<Plan>( dataflow: &DataflowDescription<Plan, CollectionMetadata>, scope: S, compute_state: &ComputeState, until: Antichain<Timestamp>, dataflow_expiration: Antichain<Timestamp>, ) -> Self

Creates a new empty Context.

source§

impl<S: Scope, T> Context<S, T>

source

pub fn insert_id( &mut self, id: Id, collection: CollectionBundle<S, T>, ) -> Option<CollectionBundle<S, T>>

Insert a collection bundle by an identifier.

This is expected to be used to install external collections (sources, indexes, other views), as well as for Let bindings of local collections.

source

pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>>

Remove a collection bundle by an identifier.

The primary use of this method is uninstalling Let bindings.

source

pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>)

Melds a collection bundle to whatever exists.

source

pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>>

Look up a collection bundle by an identifier.

source

pub(super) fn error_logger(&self) -> ErrorLogger

source§

impl<S: Scope, T> Context<S, T>

source

pub fn enter_region<'a>( &self, region: &Child<'a, S, S::Timestamp>, bindings: Option<&BTreeSet<Id>>, ) -> Context<Child<'a, S, S::Timestamp>, T>

Brings the underlying arrangements and collections into a region.

source§

impl<G> Context<G>

source

pub fn render_flat_map( &self, input: CollectionBundle<G>, func: TableFunc, exprs: Vec<MirScalarExpr>, mfp: MapFilterProject, input_key: Option<Vec<MirScalarExpr>>, ) -> CollectionBundle<G>

Applies a TableFunc to every row, followed by an mfp.

source§

impl<G> Context<G>

source

pub fn render_delta_join( &self, inputs: Vec<CollectionBundle<G>>, join_plan: DeltaJoinPlan, ) -> CollectionBundle<G>

Renders MirRelationExpr:Join using dogs^3 delta query dataflows.

The join is followed by the application of map_filter_project, whose implementation will be pushed in to the join pipeline if at all possible.

source§

impl<G, T> Context<G, T>

source

pub(crate) fn render_join( &self, inputs: Vec<CollectionBundle<G, T>>, linear_plan: LinearJoinPlan, ) -> CollectionBundle<G, T>

source

fn render_join_inner( &self, inputs: Vec<CollectionBundle<G, T>>, linear_plan: LinearJoinPlan, inner: &mut Child<'_, G, <G as ScopeParent>::Timestamp>, ) -> CollectionBundle<G, T>

source

fn differential_join<S>( &self, joined: JoinedFlavor<S, T>, lookup_relation: CollectionBundle<S, T>, _: LinearStagePlan, errors: &mut Vec<Collection<S, DataflowError, Diff>>, ) -> Collection<S, Row, Diff>
where S: Scope<Timestamp = G::Timestamp>,

Looks up the arrangement for the next input and joins it to the arranged version of the join of previous inputs.

source

fn differential_join_inner<S, Tr1, Tr2>( &self, prev_keyed: Arranged<S, Tr1>, next_input: Arranged<S, Tr2>, closure: JoinClosure, ) -> (Collection<S, Row, Diff>, Option<Collection<S, DataflowError, Diff>>)
where S: Scope<Timestamp = G::Timestamp>, Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static, Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff> + Clone + 'static, for<'a> Tr1::Key<'a>: ToDatumIter, for<'a> Tr1::Val<'a>: ToDatumIter, for<'a> Tr2::Val<'a>: ToDatumIter,

Joins the arrangement for next_input to the arranged version of the join of previous inputs. This is split into its own method to enable reuse of code with different types of next_input.

The return type includes an optional error collection, which may be None if we can determine that closure cannot error.

source§

impl<G, T> Context<G, T>

source

pub fn render_reduce( &self, input: CollectionBundle<G, T>, key_val_plan: KeyValPlan, reduce_plan: ReducePlan, input_key: Option<Vec<MirScalarExpr>>, mfp_after: Option<MapFilterProject>, ) -> CollectionBundle<G, T>

Renders a MirRelationExpr::Reduce using various non-obvious techniques to minimize worst-case incremental update times and memory footprint.

source

fn render_reduce_plan<S>( &self, plan: ReducePlan, collection: Collection<S, (Row, Row), Diff>, err_input: Collection<S, DataflowError, Diff>, key_arity: usize, mfp_after: Option<SafeMfpPlan>, ) -> CollectionBundle<S, T>
where S: Scope<Timestamp = G::Timestamp>,

Render a dataflow based on the provided plan.

The output will be an arrangements that looks the same as if we just had a single reduce operator computing everything together, and this arrangement can also be re-used.

source

fn render_reduce_plan_inner<S>( &self, plan: ReducePlan, collection: Collection<S, (Row, Row), Diff>, errors: &mut Vec<Collection<S, DataflowError, Diff>>, key_arity: usize, mfp_after: Option<SafeMfpPlan>, ) -> MzArrangement<S>
where S: Scope<Timestamp = G::Timestamp>,

source

fn build_collation<S>( &self, arrangements: Vec<(ReductionType, Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, MergeBatcher<Vec<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationChunker<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationMerger<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, <S as ScopeParent>::Timestamp>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, TimelyStack<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>)>, aggregate_types: Vec<ReductionType>, scope: &mut S, mfp_after: Option<SafeMfpPlan>, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, MergeBatcher<Vec<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationChunker<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationMerger<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, <S as ScopeParent>::Timestamp>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, TimelyStack<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
where S: Scope<Timestamp = G::Timestamp>,

Build the dataflow to combine arrangements containing results of different aggregation types into a single arrangement.

This computes the same thing as a join on the group key followed by shuffling the values into the correct order. This implementation assumes that all input arrangements present values in a way that respects the desired output order, so we can do a linear merge to form the output.

source

fn dispatch_build_distinct<S>( &self, collection: Collection<S, (Row, Row), Diff>, mfp_after: Option<SafeMfpPlan>, ) -> (MzArrangement<S>, Collection<S, DataflowError, Diff>)
where S: Scope<Timestamp = G::Timestamp>,

source

fn build_distinct<T1, T2, S>( &self, collection: Collection<S, (Row, Row), Diff>, tag: &str, mfp_after: Option<SafeMfpPlan>, ) -> (Arranged<S, TraceAgent<T1>>, Collection<S, DataflowError, Diff>)
where S: Scope<Timestamp = G::Timestamp>, for<'a> T1: Trace<Val<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff> + 'static, for<'a, 'a> T1::Key<'a>: IntoOwned<'a, Owned = Row> + Debug + ToDatumIter, T1::Batch: Batch, T1::Batcher: Batcher<Input = Vec<((Row, Row), G::Timestamp, Diff)>>, <T1::Batcher as Batcher>::Output: Container + PushInto<((Row, Row), T1::Time, T1::Diff)>, T1::Builder: Builder, Arranged<S, TraceAgent<T1>>: ArrangementSize, T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Val<'a> = &'a DataflowError, Time = G::Timestamp, Diff = Diff> + 'static, T2::Batch: Batch, T2::Batcher: Batcher<Input = Vec<((Row, DataflowError), G::Timestamp, Diff)>>, <T2::Batcher as Batcher>::Output: Container + PushInto<((Row, DataflowError), T2::Time, T2::Diff)>, Arranged<S, TraceAgent<T2>>: ArrangementSize,

Build the dataflow to compute the set of distinct keys.

source

fn build_basic_aggregates<S>( &self, input: Collection<S, (Row, Row), Diff>, aggrs: Vec<(usize, AggregateExpr)>, key_arity: usize, mfp_after: Option<SafeMfpPlan>, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, MergeBatcher<Vec<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationChunker<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationMerger<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, <S as ScopeParent>::Timestamp>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, TimelyStack<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
where S: Scope<Timestamp = G::Timestamp>,

Build the dataflow to compute and arrange multiple non-accumulable, non-hierarchical aggregations on input.

This function assumes that we are explicitly rendering multiple basic aggregations. For each aggregate, we render a different reduce operator, and then fuse results together into a final arrangement that presents all the results in the order specified by aggrs.

source

fn build_basic_aggregate<S>( &self, input: Collection<S, (Row, Row), Diff>, index: usize, aggr: &AggregateExpr, validating: bool, key_arity: usize, mfp_after: Option<SafeMfpPlan>, fused_unnest_list: bool, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, MergeBatcher<Vec<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationChunker<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationMerger<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, <S as ScopeParent>::Timestamp>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, TimelyStack<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Option<Collection<S, DataflowError, Diff>>)
where S: Scope<Timestamp = G::Timestamp>,

Build the dataflow to compute a single basic aggregation.

This method also applies distinctness if required.

source

fn build_reduce_inaccumulable_distinct<S, V, Tr>( &self, input: Collection<S, Row, Diff>, name_tag: Option<&str>, ) -> Arranged<S, TraceAgent<Tr>>
where S: Scope<Timestamp = G::Timestamp>, V: MaybeValidatingRow<(), String>, Tr: Trace + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff> + 'static, Tr::Batch: Batch, Tr::Builder: Builder<Input = TimelyStack<((Row, V), G::Timestamp, Diff)>>, for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = Row>, for<'a> Tr::Val<'a>: IntoOwned<'a, Owned = V>, Arranged<S, TraceAgent<Tr>>: ArrangementSize,

source

fn build_bucketed<S>( &self, input: Collection<S, (Row, Row), Diff>, _: BucketedPlan, key_arity: usize, mfp_after: Option<SafeMfpPlan>, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, MergeBatcher<Vec<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationChunker<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationMerger<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, <S as ScopeParent>::Timestamp>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, TimelyStack<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
where S: Scope<Timestamp = G::Timestamp>,

Build the dataflow to compute and arrange multiple hierarchical aggregations on non-monotonic inputs.

This function renders a single reduction tree that computes aggregations with a priority queue implemented with a series of reduce operators that partition the input into buckets, and compute the aggregation over very small buckets and feed the results up to larger buckets.

Note that this implementation currently ignores the distinct bit because we currently only perform min / max hierarchically and the reduction tree efficiently suppresses non-distinct updates.

buckets indicates the number of buckets in this stage. We do some non-obvious trickery here to limit the memory usage per layer by internally holding only the elements that were rejected by this stage. However, the output collection maintains the ((key, bucket), (passing value) for this stage.

source

fn build_bucketed_stage<S>( &self, aggr_funcs: &Vec<AggregateFunc>, input: &Collection<S, (Row, Row), Diff>, validating: bool, ) -> (Collection<S, (Row, Row), Diff>, Option<Collection<S, DataflowError, Diff>>)
where S: Scope<Timestamp = G::Timestamp>,

Build a bucketed stage fragment that wraps Self::build_bucketed_negated_output, and adds validation if validating is true. It returns the consolidated inputs concatenated with the negation of what’s produced by the reduction. validating indicates whether we want this stage to perform error detection for invalid accumulations. Once a stage is clean of such errors, subsequent stages can skip validation.

source

fn build_bucketed_negated_output<S, V, Tr>( &self, input: &Collection<S, (Row, Row), Diff>, aggrs: Vec<AggregateFunc>, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), G::Timestamp, Diff)>>>, MergeBatcher<Vec<((Row, Row), G::Timestamp, Diff)>, ColumnationChunker<((Row, Row), G::Timestamp, Diff)>, ColumnationMerger<((Row, Row), G::Timestamp, Diff)>, G::Timestamp>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), G::Timestamp, Diff)>, TimelyStack<((Row, Row), G::Timestamp, Diff)>>>>>>, Arranged<S, TraceAgent<Tr>>)
where S: Scope<Timestamp = G::Timestamp>, V: MaybeValidatingRow<Row, Row>, Tr: Trace + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff> + 'static, Tr::Batch: Batch, Tr::Builder: Builder<Input = TimelyStack<((Row, V), G::Timestamp, Diff)>>, for<'a> Tr::Val<'a>: IntoOwned<'a, Owned = V>, Arranged<S, TraceAgent<Tr>>: ArrangementSize,

Build a dataflow fragment for one stage of a reduction tree for multiple hierarchical aggregates to arrange and reduce the inputs. Returns the arranged input and the reduction, with all diffs in the reduction’s output negated.

source

fn build_monotonic<S>( &self, collection: Collection<S, (Row, Row), Diff>, _: MonotonicPlan, mfp_after: Option<SafeMfpPlan>, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, MergeBatcher<Vec<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationChunker<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationMerger<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, <S as ScopeParent>::Timestamp>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, TimelyStack<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
where S: Scope<Timestamp = G::Timestamp>,

Build the dataflow to compute and arrange multiple hierarchical aggregations on monotonic inputs.

source

fn build_accumulable<S>( &self, collection: Collection<S, (Row, Row), Diff>, _: AccumulablePlan, key_arity: usize, mfp_after: Option<SafeMfpPlan>, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, MergeBatcher<Vec<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationChunker<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, ColumnationMerger<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, <S as ScopeParent>::Timestamp>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>, TimelyStack<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
where S: Scope<Timestamp = G::Timestamp>,

Build the dataflow to compute and arrange multiple accumulable aggregations.

The incoming values are moved to the update’s “difference” field, at which point they can be accumulated in place. The count operator promotes the accumulated values to data, at which point a final map applies operator-specific logic to yield the final aggregate.

source§

impl<'g, G, T> Context<Child<'g, G, T>>
where G: Scope<Timestamp = Timestamp>, T: RenderTimestamp,

source

pub(crate) fn export_sink( &self, compute_state: &mut ComputeState, tokens: &BTreeMap<GlobalId, Rc<dyn Any>>, dependency_ids: BTreeSet<GlobalId>, sink_id: GlobalId, sink: &ComputeSinkDesc<CollectionMetadata>, start_signal: StartSignal, ct_times: Option<Collection<G, (), Diff>>, )

Export the sink described by sink from the rendering context.

source§

impl<G, T> Context<G, T>

source

pub(crate) fn render_threshold( &self, input: CollectionBundle<G, T>, threshold_plan: ThresholdPlan, ) -> CollectionBundle<G, T>

source§

impl<G> Context<G>

source

pub(crate) fn render_topk( &self, input: CollectionBundle<G>, top_k_plan: TopKPlan, ) -> CollectionBundle<G>

source

fn build_topk<S>( &self, collection: Collection<S, Row, Diff>, group_key: Vec<usize>, order_key: Vec<ColumnOrder>, offset: usize, limit: Option<MirScalarExpr>, arity: usize, buckets: Vec<u64>, ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
where S: Scope<Timestamp = G::Timestamp>,

Constructs a TopK dataflow subgraph.

source

fn build_topk_stage<S>( &self, collection: Collection<S, (Row, Row), Diff>, order_key: Vec<ColumnOrder>, modulus: u64, offset: usize, limit: Option<MirScalarExpr>, arity: usize, validating: bool, ) -> (Collection<S, (Row, Row), Diff>, Option<Collection<S, DataflowError, Diff>>)
where S: Scope<Timestamp = G::Timestamp>,

To provide a robust incremental orderby-limit experience, we want to avoid grouping all records (or even large groups) and then applying the ordering and limit. Instead, a more robust approach forms groups of bounded size and applies the offset and limit to each, and then increases the sizes of the groups.

Builds a “stage”, which uses a finer grouping than is required to reduce the volume of updates, and to reduce the amount of work on the critical path for updates. The cost is a larger number of arrangements when this optimization does nothing beneficial.

The function accepts a collection of the form (hash_key, row), a modulus it applies to the hash_key’s hash datum, an offset for returning results, and a limit to restrict the output size. arity represents the number of columns in the input data, and if validating is true, we check for negative multiplicities, which indicate an error in the input data.

The output of this function is not consolidated.

The dataflow fragment has the following shape:

    | input
    |
  arrange
    |\
    | \
    |  reduce
    |  |
    concat
    |
    | output

There are additional map/flat_map operators as well as error demuxing operators, but we’re omitting them here for the sake of simplicity.

source

fn render_top1_monotonic<S>( &self, collection: Collection<S, Row, Diff>, group_key: Vec<usize>, order_key: Vec<ColumnOrder>, must_consolidate: bool, ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
where S: Scope<Timestamp = G::Timestamp>,

source§

impl<'g, G, T> Context<Child<'g, G, T>>
where G: Scope<Timestamp = Timestamp>, T: Refines<G::Timestamp> + RenderTimestamp,

source

pub(crate) fn import_index( &mut self, compute_state: &mut ComputeState, tokens: &mut BTreeMap<GlobalId, Rc<dyn Any>>, input_probe: Handle<Timestamp>, idx_id: GlobalId, idx: &IndexDesc, start_signal: StartSignal, )

source§

impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>
where G: Scope<Timestamp = Timestamp>,

source

pub(crate) fn export_index( &self, compute_state: &mut ComputeState, tokens: &BTreeMap<GlobalId, Rc<dyn Any>>, dependency_ids: BTreeSet<GlobalId>, idx_id: GlobalId, idx: &IndexDesc, )

source§

impl<'g, G, T> Context<Child<'g, G, T>>
where G: Scope<Timestamp = Timestamp>, T: RenderTimestamp,

source

pub(crate) fn export_index_iterative( &self, compute_state: &mut ComputeState, tokens: &BTreeMap<GlobalId, Rc<dyn Any>>, dependency_ids: BTreeSet<GlobalId>, idx_id: GlobalId, idx: &IndexDesc, )

source

fn dispatch_rearrange_iterative( &self, oks: MzArrangement<Child<'g, G, T>>, name: &str, ) -> MzArrangement<G>

Dispatches the rearranging of an arrangement coming from an iterative scope according to specialized key-value arrangement types.

source§

impl<G> Context<G>
where G: Scope<Timestamp = Product<Timestamp, PointStamp<u64>>>,

source

pub fn render_recursive_plan( &mut self, object_id: GlobalId, level: usize, plan: RenderPlan, ) -> CollectionBundle<G>

Renders a plan to a differential dataflow, producing the collection of results.

This method allows for plan to contain RecBinds, and is planned in the context of level pre-existing iteration coordinates.

This method recursively descends RecBind values, establishing nested scopes for each and establishing the appropriate recursive dependencies among the bound variables. Once all RecBinds have been rendered it calls in to render_plan which will error if further RecBinds are found.

The method requires that all variables conclude with a physical representation that contains a collection (i.e. a non-arrangement), and it will panic otherwise.

source§

impl<G> Context<G>

source

pub fn render_plan( &mut self, object_id: GlobalId, plan: RenderPlan, ) -> CollectionBundle<G>

Renders a non-recursive plan to a differential dataflow, producing the collection of results.

The return type reflects the uncertainty about the data representation, perhaps as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.

§Panics

Panics if the given plan contains any RecBinds. Recursive plans must be rendered using render_recursive_plan instead.

source

fn render_letfree_plan( &mut self, object_id: GlobalId, plan: LetFreePlan, ) -> CollectionBundle<G>

Renders a let-free plan to a differential dataflow, producing the collection of results.

source

fn render_plan_expr( &mut self, expr: Expr, collections: &BTreeMap<LirId, CollectionBundle<G>>, ) -> CollectionBundle<G>

Renders a render_plan::Expr, producing the collection of results.

§Panics

Panics if any of the expr’s inputs is not found in collections. Callers must ensure that input nodes have been rendered previously.

source

fn log_dataflow_global_id(&self, id: usize, global_id: GlobalId)

source

fn log_lir_mapping( &self, global_id: GlobalId, mapping: Box<[(LirId, LirMetadata)]>, )

source

fn log_operator_hydration( &self, bundle: &mut CollectionBundle<G>, lir_id: LirId, )

source

fn log_operator_hydration_inner<D>( &self, stream: &Stream<G, D>, lir_id: LirId, ) -> Stream<G, D>
where D: Clone + 'static,

Auto Trait Implementations§

§

impl<S, T> Freeze for Context<S, T>
where <S as ScopeParent>::Timestamp: Sized, S: Freeze, T: Freeze,

§

impl<S, T = Timestamp> !RefUnwindSafe for Context<S, T>

§

impl<S, T = Timestamp> !Send for Context<S, T>

§

impl<S, T = Timestamp> !Sync for Context<S, T>

§

impl<S, T> Unpin for Context<S, T>
where <S as ScopeParent>::Timestamp: Sized, S: Unpin, T: Unpin,

§

impl<S, T = Timestamp> !UnwindSafe for Context<S, T>

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T, U> CastInto<U> for T
where U: CastFrom<T>,

source§

fn cast_into(self) -> U

Performs the cast.
source§

impl<T> Conv for T

source§

fn conv<T>(self) -> T
where Self: Into<T>,

Converts self into T using Into<T>. Read more
source§

impl<T> FmtForward for T

source§

fn fmt_binary(self) -> FmtBinary<Self>
where Self: Binary,

Causes self to use its Binary implementation when Debug-formatted.
source§

fn fmt_display(self) -> FmtDisplay<Self>
where Self: Display,

Causes self to use its Display implementation when Debug-formatted.
source§

fn fmt_lower_exp(self) -> FmtLowerExp<Self>
where Self: LowerExp,

Causes self to use its LowerExp implementation when Debug-formatted.
source§

fn fmt_lower_hex(self) -> FmtLowerHex<Self>
where Self: LowerHex,

Causes self to use its LowerHex implementation when Debug-formatted.
source§

fn fmt_octal(self) -> FmtOctal<Self>
where Self: Octal,

Causes self to use its Octal implementation when Debug-formatted.
source§

fn fmt_pointer(self) -> FmtPointer<Self>
where Self: Pointer,

Causes self to use its Pointer implementation when Debug-formatted.
source§

fn fmt_upper_exp(self) -> FmtUpperExp<Self>
where Self: UpperExp,

Causes self to use its UpperExp implementation when Debug-formatted.
source§

fn fmt_upper_hex(self) -> FmtUpperHex<Self>
where Self: UpperHex,

Causes self to use its UpperHex implementation when Debug-formatted.
source§

fn fmt_list(self) -> FmtList<Self>
where &'a Self: for<'a> IntoIterator,

Formats each item in a sequence. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> FutureExt for T

source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<Unshared, Shared> IntoShared<Shared> for Unshared
where Shared: FromUnshared<Unshared>,

source§

fn into_shared(self) -> Shared

Creates a shared type from an unshared type.
source§

impl<T, U> OverrideFrom<Option<&T>> for U
where U: OverrideFrom<T>,

source§

fn override_from(self, layer: &Option<&T>) -> U

Override the configuration represented by Self with values from the given layer.
source§

impl<T> Pipe for T
where T: ?Sized,

source§

fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> R
where Self: Sized,

Pipes by value. This is generally the method you want to use. Read more
source§

fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> R
where R: 'a,

Borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> R
where R: 'a,

Mutably borrows self and passes that borrow into the pipe function. Read more
source§

fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
where Self: Borrow<B>, B: 'a + ?Sized, R: 'a,

Borrows self, then passes self.borrow() into the pipe function. Read more
source§

fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
where Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.borrow_mut() into the pipe function. Read more
source§

fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
where Self: AsRef<U>, U: 'a + ?Sized, R: 'a,

Borrows self, then passes self.as_ref() into the pipe function.
source§

fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
where Self: AsMut<U>, U: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.as_mut() into the pipe function.
source§

fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
where Self: Deref<Target = T>, T: 'a + ?Sized, R: 'a,

Borrows self, then passes self.deref() into the pipe function.
source§

fn pipe_deref_mut<'a, T, R>( &'a mut self, func: impl FnOnce(&'a mut T) -> R, ) -> R
where Self: DerefMut<Target = T> + Deref, T: 'a + ?Sized, R: 'a,

Mutably borrows self, then passes self.deref_mut() into the pipe function.
source§

impl<T> Pointable for T

source§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<P, R> ProtoType<R> for P
where R: RustType<P>,

source§

impl<T> Same for T

§

type Output = T

Should always be Self
source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
source§

impl<T> Tap for T

source§

fn tap(self, func: impl FnOnce(&Self)) -> Self

Immutable access to a value. Read more
source§

fn tap_mut(self, func: impl FnOnce(&mut Self)) -> Self

Mutable access to a value. Read more
source§

fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Immutable access to the Borrow<B> of a value. Read more
source§

fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Mutable access to the BorrowMut<B> of a value. Read more
source§

fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Immutable access to the AsRef<R> view of a value. Read more
source§

fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Mutable access to the AsMut<R> view of a value. Read more
source§

fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Immutable access to the Deref::Target of a value. Read more
source§

fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Mutable access to the Deref::Target of a value. Read more
source§

fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self

Calls .tap() only in debug builds, and is erased in release builds.
source§

fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self

Calls .tap_mut() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
where Self: Borrow<B>, B: ?Sized,

Calls .tap_borrow() only in debug builds, and is erased in release builds.
source§

fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
where Self: BorrowMut<B>, B: ?Sized,

Calls .tap_borrow_mut() only in debug builds, and is erased in release builds.
source§

fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
where Self: AsRef<R>, R: ?Sized,

Calls .tap_ref() only in debug builds, and is erased in release builds.
source§

fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
where Self: AsMut<R>, R: ?Sized,

Calls .tap_ref_mut() only in debug builds, and is erased in release builds.
source§

fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
where Self: Deref<Target = T>, T: ?Sized,

Calls .tap_deref() only in debug builds, and is erased in release builds.
source§

fn tap_deref_mut_dbg<T>(self, func: impl FnOnce(&mut T)) -> Self
where Self: DerefMut<Target = T> + Deref, T: ?Sized,

Calls .tap_deref_mut() only in debug builds, and is erased in release builds.
source§

impl<T> TryConv for T

source§

fn try_conv<T>(self) -> Result<T, Self::Error>
where Self: TryInto<T>,

Attempts to convert self into T using TryInto<T>. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

source§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more