Struct mz_compute::render::context::Context
source · pub struct Context<S: Scope, T = Timestamp>where
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,{
pub(crate) scope: S,
pub debug_name: String,
pub dataflow_id: usize,
pub as_of_frontier: Antichain<T>,
pub until: Antichain<T>,
pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
pub(super) shutdown_token: ShutdownToken,
pub(super) hydration_logger: Option<HydrationLogger>,
pub(super) linear_join_spec: LinearJoinSpec,
pub(super) enable_operator_hydration_status_logging: bool,
}
Expand description
Dataflow-local collections and arrangements.
A context means to wrap available data assets and present them in an easy-to-use manner. These assets include dataflow-local collections and arrangements, as well as imported arrangements from outside the dataflow.
Context has two timestamp types, one from S::Timestamp
and one from T
, where the
former must refine the latter. The former is the timestamp used by the scope in question,
and the latter is the timestamp of imported traces. The two may be different in the case
of regions or iteration.
Fields§
§scope: S
The scope within which all managed collections exist.
It is an error to add any collections not contained in this scope.
debug_name: String
The debug name of the dataflow associated with this context.
dataflow_id: usize
The Timely ID of the dataflow associated with this context.
as_of_frontier: Antichain<T>
Frontier before which updates should not be emitted.
We must apply it to sinks, to ensure correct outputs. We should apply it to sources and imported traces, because it improves performance.
until: Antichain<T>
Frontier after which updates should not be emitted. Used to limit the amount of work done when appropriate.
bindings: BTreeMap<Id, CollectionBundle<S, T>>
Bindings of identifiers to collections.
shutdown_token: ShutdownToken
A token that operators can probe to know whether the dataflow is shutting down.
hydration_logger: Option<HydrationLogger>
A logger that operators can use to report hydration events.
None
if no hydration events should be logged in this context.
linear_join_spec: LinearJoinSpec
Specification for rendering linear joins.
enable_operator_hydration_status_logging: bool
Whether to log operator hydration status.
Implementations§
source§impl<S: Scope> Context<S>where
S::Timestamp: Lattice + Refines<Timestamp> + Columnation,
impl<S: Scope> Context<S>where S::Timestamp: Lattice + Refines<Timestamp> + Columnation,
sourcepub fn for_dataflow_in<Plan>(
dataflow: &DataflowDescription<Plan, CollectionMetadata>,
scope: S,
compute_state: &ComputeState
) -> Self
pub fn for_dataflow_in<Plan>( dataflow: &DataflowDescription<Plan, CollectionMetadata>, scope: S, compute_state: &ComputeState ) -> Self
Creates a new empty Context.
source§impl<S: Scope, T> Context<S, T>where
T: Timestamp + Lattice + Columnation,
S::Timestamp: Lattice + Refines<T> + Columnation,
impl<S: Scope, T> Context<S, T>where T: Timestamp + Lattice + Columnation, S::Timestamp: Lattice + Refines<T> + Columnation,
sourcepub fn insert_id(
&mut self,
id: Id,
collection: CollectionBundle<S, T>
) -> Option<CollectionBundle<S, T>>
pub fn insert_id( &mut self, id: Id, collection: CollectionBundle<S, T> ) -> Option<CollectionBundle<S, T>>
Insert a collection bundle by an identifier.
This is expected to be used to install external collections (sources, indexes, other views),
as well as for Let
bindings of local collections.
sourcepub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>>
pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>>
Remove a collection bundle by an identifier.
The primary use of this method is uninstalling Let
bindings.
sourcepub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>)
pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>)
Melds a collection bundle to whatever exists.
sourcepub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>>
pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>>
Look up a collection bundle by an identifier.
pub(super) fn error_logger(&self) -> ErrorLogger
source§impl<G> Context<G>where
G: Scope,
G::Timestamp: RenderTimestamp,
impl<G> Context<G>where G: Scope, G::Timestamp: RenderTimestamp,
sourcepub fn render_flat_map(
&mut self,
input: CollectionBundle<G>,
func: TableFunc,
exprs: Vec<MirScalarExpr>,
mfp: MapFilterProject,
input_key: Option<Vec<MirScalarExpr>>
) -> CollectionBundle<G>
pub fn render_flat_map( &mut self, input: CollectionBundle<G>, func: TableFunc, exprs: Vec<MirScalarExpr>, mfp: MapFilterProject, input_key: Option<Vec<MirScalarExpr>> ) -> CollectionBundle<G>
Renders relation_expr
followed by map_filter_project
if provided.
source§impl<G> Context<G>where
G: Scope,
G::Timestamp: RenderTimestamp,
impl<G> Context<G>where G: Scope, G::Timestamp: RenderTimestamp,
sourcepub fn render_delta_join(
&mut self,
inputs: Vec<CollectionBundle<G>>,
join_plan: DeltaJoinPlan
) -> CollectionBundle<G>
pub fn render_delta_join( &mut self, inputs: Vec<CollectionBundle<G>>, join_plan: DeltaJoinPlan ) -> CollectionBundle<G>
Renders MirRelationExpr:Join
using dogs^3 delta query dataflows.
The join is followed by the application of map_filter_project
, whose
implementation will be pushed in to the join pipeline if at all possible.
source§impl<G, T> Context<G, T>where
G: Scope,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
impl<G, T> Context<G, T>where G: Scope, G::Timestamp: Lattice + Refines<T> + Columnation, T: Timestamp + Lattice + Columnation,
pub(crate) fn render_join( &mut self, inputs: Vec<CollectionBundle<G, T>>, linear_plan: LinearJoinPlan ) -> CollectionBundle<G, T>
sourcefn differential_join<S>(
&self,
joined: JoinedFlavor<S, T>,
lookup_relation: CollectionBundle<S, T>,
_: LinearStagePlan,
errors: &mut Vec<Collection<S, DataflowError, Diff>>
) -> Collection<S, Row, Diff>where
S: Scope<Timestamp = G::Timestamp>,
fn differential_join<S>( &self, joined: JoinedFlavor<S, T>, lookup_relation: CollectionBundle<S, T>, _: LinearStagePlan, errors: &mut Vec<Collection<S, DataflowError, Diff>> ) -> Collection<S, Row, Diff>where S: Scope<Timestamp = G::Timestamp>,
Looks up the arrangement for the next input and joins it to the arranged version of the join of previous inputs.
sourcefn differential_join_inner<S, Tr1, Tr2>(
&self,
prev_keyed: Arranged<S, Tr1>,
next_input: Arranged<S, Tr2>,
key_types: Option<Vec<ColumnType>>,
prev_types: Option<Vec<ColumnType>>,
next_types: Option<Vec<ColumnType>>,
closure: JoinClosure
) -> (Collection<S, Row, Diff>, Option<Collection<S, DataflowError, Diff>>)where
S: Scope<Timestamp = G::Timestamp>,
Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
for<'a> Tr1::Key<'a>: IntoRowByTypes,
for<'a> Tr1::Val<'a>: IntoRowByTypes,
for<'a> Tr2::Val<'a>: IntoRowByTypes,
fn differential_join_inner<S, Tr1, Tr2>( &self, prev_keyed: Arranged<S, Tr1>, next_input: Arranged<S, Tr2>, key_types: Option<Vec<ColumnType>>, prev_types: Option<Vec<ColumnType>>, next_types: Option<Vec<ColumnType>>, closure: JoinClosure ) -> (Collection<S, Row, Diff>, Option<Collection<S, DataflowError, Diff>>)where S: Scope<Timestamp = G::Timestamp>, Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static, Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff> + Clone + 'static, for<'a> Tr1::Key<'a>: IntoRowByTypes, for<'a> Tr1::Val<'a>: IntoRowByTypes, for<'a> Tr2::Val<'a>: IntoRowByTypes,
Joins the arrangement for next_input
to the arranged version of the
join of previous inputs. This is split into its own method to enable
reuse of code with different types of next_input
.
The return type includes an optional error collection, which may be
None
if we can determine that closure
cannot error.
source§impl<G, T> Context<G, T>where
G: Scope,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
impl<G, T> Context<G, T>where G: Scope, G::Timestamp: Lattice + Refines<T> + Columnation, T: Timestamp + Lattice + Columnation,
sourcepub fn render_reduce(
&mut self,
input: CollectionBundle<G, T>,
key_val_plan: KeyValPlan,
reduce_plan: ReducePlan,
input_key: Option<Vec<MirScalarExpr>>,
mfp_after: Option<MapFilterProject>
) -> CollectionBundle<G, T>
pub fn render_reduce( &mut self, input: CollectionBundle<G, T>, key_val_plan: KeyValPlan, reduce_plan: ReducePlan, input_key: Option<Vec<MirScalarExpr>>, mfp_after: Option<MapFilterProject> ) -> CollectionBundle<G, T>
Renders a MirRelationExpr::Reduce
using various non-obvious techniques to
minimize worst-case incremental update times and memory footprint.
sourcefn render_reduce_plan<S>(
&self,
plan: ReducePlan,
collection: Collection<S, (Row, Row), Diff>,
err_input: Collection<S, DataflowError, Diff>,
key_arity: usize,
mfp_after: Option<SafeMfpPlan>
) -> CollectionBundle<S, T>where
S: Scope<Timestamp = G::Timestamp>,
fn render_reduce_plan<S>( &self, plan: ReducePlan, collection: Collection<S, (Row, Row), Diff>, err_input: Collection<S, DataflowError, Diff>, key_arity: usize, mfp_after: Option<SafeMfpPlan> ) -> CollectionBundle<S, T>where S: Scope<Timestamp = G::Timestamp>,
Render a dataflow based on the provided plan.
The output will be an arrangements that looks the same as if we just had a single reduce operator computing everything together, and this arrangement can also be re-used.
fn render_reduce_plan_inner<S>( &self, plan: ReducePlan, collection: Collection<S, (Row, Row), Diff>, errors: &mut Vec<Collection<S, DataflowError, Diff>>, key_arity: usize, mfp_after: Option<SafeMfpPlan> ) -> SpecializedArrangement<S>where S: Scope<Timestamp = G::Timestamp>,
sourcefn build_collation<S>(
&self,
arrangements: Vec<(ReductionType, Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>)>,
aggregate_types: Vec<ReductionType>,
scope: &mut S,
mfp_after: Option<SafeMfpPlan>
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)where
S: Scope<Timestamp = G::Timestamp>,
fn build_collation<S>( &self, arrangements: Vec<(ReductionType, Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>)>, aggregate_types: Vec<ReductionType>, scope: &mut S, mfp_after: Option<SafeMfpPlan> ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)where S: Scope<Timestamp = G::Timestamp>,
Build the dataflow to combine arrangements containing results of different aggregation types into a single arrangement.
This computes the same thing as a join on the group key followed by shuffling the values into the correct order. This implementation assumes that all input arrangements present values in a way that respects the desired output order, so we can do a linear merge to form the output.
fn dispatch_build_distinct<S>( &self, collection: Collection<S, (Row, Row), Diff>, mfp_after: Option<SafeMfpPlan> ) -> (SpecializedArrangement<S>, Collection<S, DataflowError, Diff>)where S: Scope<Timestamp = G::Timestamp>,
sourcefn build_distinct<T1, T2, S>(
&self,
collection: Collection<S, (Row, T1::ValOwned), Diff>,
tag: &str,
mfp_after: Option<SafeMfpPlan>
) -> (Arranged<S, TraceAgent<T1>>, Collection<S, DataflowError, Diff>)where
S: Scope<Timestamp = G::Timestamp>,
T1: Trace<KeyOwned = Row, Time = G::Timestamp, Diff = Diff> + 'static,
T1::Batch: Batch,
T1::Batcher: Batcher<Item = ((Row, T1::ValOwned), G::Timestamp, Diff)>,
for<'a> T1::Key<'a>: Debug + IntoRowByTypes,
T1::ValOwned: Columnation + ExchangeData + Default,
Arranged<S, TraceAgent<T1>>: ArrangementSize,
T2: for<'a> Trace<Key<'a> = T1::Key<'a>, KeyOwned = Row, ValOwned = DataflowError, Time = G::Timestamp, Diff = Diff> + 'static,
T2::Batch: Batch,
T2::Batcher: Batcher<Item = ((Row, T2::ValOwned), G::Timestamp, Diff)>,
Arranged<S, TraceAgent<T2>>: ArrangementSize,
fn build_distinct<T1, T2, S>( &self, collection: Collection<S, (Row, T1::ValOwned), Diff>, tag: &str, mfp_after: Option<SafeMfpPlan> ) -> (Arranged<S, TraceAgent<T1>>, Collection<S, DataflowError, Diff>)where S: Scope<Timestamp = G::Timestamp>, T1: Trace<KeyOwned = Row, Time = G::Timestamp, Diff = Diff> + 'static, T1::Batch: Batch, T1::Batcher: Batcher<Item = ((Row, T1::ValOwned), G::Timestamp, Diff)>, for<'a> T1::Key<'a>: Debug + IntoRowByTypes, T1::ValOwned: Columnation + ExchangeData + Default, Arranged<S, TraceAgent<T1>>: ArrangementSize, T2: for<'a> Trace<Key<'a> = T1::Key<'a>, KeyOwned = Row, ValOwned = DataflowError, Time = G::Timestamp, Diff = Diff> + 'static, T2::Batch: Batch, T2::Batcher: Batcher<Item = ((Row, T2::ValOwned), G::Timestamp, Diff)>, Arranged<S, TraceAgent<T2>>: ArrangementSize,
Build the dataflow to compute the set of distinct keys.
sourcefn build_basic_aggregates<S>(
&self,
input: Collection<S, (Row, Row), Diff>,
aggrs: Vec<(usize, AggregateExpr)>,
key_arity: usize,
mfp_after: Option<SafeMfpPlan>
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)where
S: Scope<Timestamp = G::Timestamp>,
fn build_basic_aggregates<S>( &self, input: Collection<S, (Row, Row), Diff>, aggrs: Vec<(usize, AggregateExpr)>, key_arity: usize, mfp_after: Option<SafeMfpPlan> ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)where S: Scope<Timestamp = G::Timestamp>,
Build the dataflow to compute and arrange multiple non-accumulable,
non-hierarchical aggregations on input
.
This function assumes that we are explicitly rendering multiple basic aggregations.
For each aggregate, we render a different reduce operator, and then fuse
results together into a final arrangement that presents all the results
in the order specified by aggrs
.
sourcefn build_basic_aggregate<S>(
&self,
input: Collection<S, (Row, Row), Diff>,
index: usize,
aggr: &AggregateExpr,
validating: bool,
key_arity: usize,
mfp_after: Option<SafeMfpPlan>
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Option<Collection<S, DataflowError, Diff>>)where
S: Scope<Timestamp = G::Timestamp>,
fn build_basic_aggregate<S>( &self, input: Collection<S, (Row, Row), Diff>, index: usize, aggr: &AggregateExpr, validating: bool, key_arity: usize, mfp_after: Option<SafeMfpPlan> ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Option<Collection<S, DataflowError, Diff>>)where S: Scope<Timestamp = G::Timestamp>,
Build the dataflow to compute a single basic aggregation.
This method also applies distinctness if required.
fn build_reduce_inaccumulable_distinct<S, Tr>( &self, input: Collection<S, Row, Diff>, name_tag: Option<&str> ) -> Arranged<S, TraceAgent<Tr>>where S: Scope<Timestamp = G::Timestamp>, Tr::ValOwned: MaybeValidatingRow<(), String>, Tr: Trace + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff> + 'static, Tr::Batch: Batch, Tr::Batcher: Batcher<Item = ((Row, Tr::ValOwned), G::Timestamp, Diff)>, Arranged<S, TraceAgent<Tr>>: ArrangementSize,
sourcefn build_bucketed<S>(
&self,
input: Collection<S, (Row, Row), Diff>,
_: BucketedPlan,
key_arity: usize,
mfp_after: Option<SafeMfpPlan>
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)where
S: Scope<Timestamp = G::Timestamp>,
fn build_bucketed<S>( &self, input: Collection<S, (Row, Row), Diff>, _: BucketedPlan, key_arity: usize, mfp_after: Option<SafeMfpPlan> ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)where S: Scope<Timestamp = G::Timestamp>,
Build the dataflow to compute and arrange multiple hierarchical aggregations on non-monotonic inputs.
This function renders a single reduction tree that computes aggregations with a priority queue implemented with a series of reduce operators that partition the input into buckets, and compute the aggregation over very small buckets and feed the results up to larger buckets.
Note that this implementation currently ignores the distinct bit because we currently only perform min / max hierarchically and the reduction tree efficiently suppresses non-distinct updates.
sourcefn build_bucketed_negated_output<S, Tr>(
&self,
input: &Collection<S, (Row, Row), Diff>,
aggrs: Vec<AggregateFunc>
) -> Arranged<S, TraceAgent<Tr>>where
S: Scope<Timestamp = G::Timestamp>,
Tr::ValOwned: MaybeValidatingRow<Row, Row>,
Tr: Trace + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff> + 'static,
Tr::Batch: Batch,
Tr::Batcher: Batcher<Item = ((Row, Tr::ValOwned), G::Timestamp, Diff)>,
Arranged<S, TraceAgent<Tr>>: ArrangementSize,
fn build_bucketed_negated_output<S, Tr>( &self, input: &Collection<S, (Row, Row), Diff>, aggrs: Vec<AggregateFunc> ) -> Arranged<S, TraceAgent<Tr>>where S: Scope<Timestamp = G::Timestamp>, Tr::ValOwned: MaybeValidatingRow<Row, Row>, Tr: Trace + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff> + 'static, Tr::Batch: Batch, Tr::Batcher: Batcher<Item = ((Row, Tr::ValOwned), G::Timestamp, Diff)>, Arranged<S, TraceAgent<Tr>>: ArrangementSize,
Build the dataflow for one stage of a reduction tree for multiple hierarchical aggregates.
buckets
indicates the number of buckets in this stage. We do some non
obvious trickery here to limit the memory usage per layer by internally
holding only the elements that were rejected by this stage. However, the
output collection maintains the ((key, bucket), (passing value)
for this
stage.
validating
indicates whether we want this stage to perform error detection
for invalid accumulations. Once a stage is clean of such errors, subsequent
stages can skip validation.
sourcefn build_monotonic<S>(
&self,
collection: Collection<S, (Row, Row), Diff>,
_: MonotonicPlan,
mfp_after: Option<SafeMfpPlan>
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)where
S: Scope<Timestamp = G::Timestamp>,
fn build_monotonic<S>( &self, collection: Collection<S, (Row, Row), Diff>, _: MonotonicPlan, mfp_after: Option<SafeMfpPlan> ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)where S: Scope<Timestamp = G::Timestamp>,
Build the dataflow to compute and arrange multiple hierarchical aggregations on monotonic inputs.
sourcefn build_accumulable<S>(
&self,
collection: Collection<S, (Row, Row), Diff>,
_: AccumulablePlan,
key_arity: usize,
mfp_after: Option<SafeMfpPlan>
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)where
S: Scope<Timestamp = G::Timestamp>,
fn build_accumulable<S>( &self, collection: Collection<S, (Row, Row), Diff>, _: AccumulablePlan, key_arity: usize, mfp_after: Option<SafeMfpPlan> ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>, ColumnatedMergeBatcher<Row, Row, <S as ScopeParent>::Timestamp, Diff>, RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)where S: Scope<Timestamp = G::Timestamp>,
Build the dataflow to compute and arrange multiple accumulable aggregations.
The incoming values are moved to the update’s “difference” field, at which point
they can be accumulated in place. The count
operator promotes the accumulated
values to data, at which point a final map applies operator-specific logic to
yield the final aggregate.
source§impl<'g, G, T> Context<Child<'g, G, T>>where
G: Scope<Timestamp = Timestamp>,
T: RenderTimestamp,
impl<'g, G, T> Context<Child<'g, G, T>>where G: Scope<Timestamp = Timestamp>, T: RenderTimestamp,
sourcepub(crate) fn export_sink(
&mut self,
compute_state: &mut ComputeState,
tokens: &BTreeMap<GlobalId, Rc<dyn Any>>,
dependency_ids: BTreeSet<GlobalId>,
sink_id: GlobalId,
sink: &ComputeSinkDesc<CollectionMetadata>
)
pub(crate) fn export_sink( &mut self, compute_state: &mut ComputeState, tokens: &BTreeMap<GlobalId, Rc<dyn Any>>, dependency_ids: BTreeSet<GlobalId>, sink_id: GlobalId, sink: &ComputeSinkDesc<CollectionMetadata> )
Export the sink described by sink
from the rendering context.
source§impl<G, T> Context<G, T>where
G: Scope,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
impl<G, T> Context<G, T>where G: Scope, G::Timestamp: Lattice + Refines<T> + Columnation, T: Timestamp + Lattice + Columnation,
pub(crate) fn render_threshold( &self, input: CollectionBundle<G, T>, threshold_plan: ThresholdPlan ) -> CollectionBundle<G, T>
source§impl<G> Context<G>where
G: Scope,
G::Timestamp: RenderTimestamp,
impl<G> Context<G>where G: Scope, G::Timestamp: RenderTimestamp,
pub(crate) fn render_topk( &mut self, input: CollectionBundle<G>, top_k_plan: TopKPlan ) -> CollectionBundle<G>
sourcefn build_topk<S>(
&self,
collection: Collection<S, Row, Diff>,
group_key: Vec<usize>,
order_key: Vec<ColumnOrder>,
offset: usize,
limit: Option<MirScalarExpr>,
arity: usize,
buckets: Vec<u64>
) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)where
S: Scope<Timestamp = G::Timestamp>,
fn build_topk<S>( &self, collection: Collection<S, Row, Diff>, group_key: Vec<usize>, order_key: Vec<ColumnOrder>, offset: usize, limit: Option<MirScalarExpr>, arity: usize, buckets: Vec<u64> ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)where S: Scope<Timestamp = G::Timestamp>,
Constructs a TopK dataflow subgraph.
fn build_topk_stage<S>( &self, collection: Collection<S, (Row, Row), Diff>, order_key: Vec<ColumnOrder>, modulus: u64, offset: usize, limit: Option<MirScalarExpr>, arity: usize, validating: bool ) -> (Collection<S, (Row, Row), Diff>, Option<Collection<S, DataflowError, Diff>>)where S: Scope<Timestamp = G::Timestamp>,
fn render_top1_monotonic<S>( &self, collection: Collection<S, Row, Diff>, group_key: Vec<usize>, order_key: Vec<ColumnOrder>, must_consolidate: bool ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)where S: Scope<Timestamp = G::Timestamp>,
source§impl<'g, G, T> Context<Child<'g, G, T>>where
G: Scope<Timestamp = Timestamp>,
T: Refines<G::Timestamp> + RenderTimestamp,
impl<'g, G, T> Context<Child<'g, G, T>>where G: Scope<Timestamp = Timestamp>, T: Refines<G::Timestamp> + RenderTimestamp,
source§impl<G> Context<G>where
G: Scope,
G::Timestamp: RenderTimestamp,
impl<G> Context<G>where G: Scope, G::Timestamp: RenderTimestamp,
pub(crate) fn build_object(&mut self, object: BuildDesc<Plan>)
source§impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>where
G: Scope<Timestamp = Timestamp>,
impl<'g, G> Context<Child<'g, G, G::Timestamp>, G::Timestamp>where G: Scope<Timestamp = Timestamp>,
source§impl<'g, G, T> Context<Child<'g, G, T>>where
G: Scope<Timestamp = Timestamp>,
T: RenderTimestamp,
impl<'g, G, T> Context<Child<'g, G, T>>where G: Scope<Timestamp = Timestamp>, T: RenderTimestamp,
pub(crate) fn export_index_iterative( &mut self, compute_state: &mut ComputeState, tokens: &BTreeMap<GlobalId, Rc<dyn Any>>, dependency_ids: BTreeSet<GlobalId>, idx_id: GlobalId, idx: &IndexDesc )
sourcefn dispatch_rearrange_iterative(
&self,
oks: SpecializedArrangement<Child<'g, G, T>>,
name: &str
) -> SpecializedArrangement<G>
fn dispatch_rearrange_iterative( &self, oks: SpecializedArrangement<Child<'g, G, T>>, name: &str ) -> SpecializedArrangement<G>
Dispatches the rearranging of an arrangement coming from an iterative scope according to specialized key-value arrangement types.
sourcefn rearrange_iterative<Tr1, Tr2>(
&self,
oks: Arranged<Child<'g, G, T>, TraceAgent<Tr1>>,
name: &str
) -> Arranged<G, TraceAgent<Tr2>>where
Tr1: TraceReader<Time = T, Diff = Diff>,
Tr1::KeyOwned: Columnation + ExchangeData + Hashable,
Tr1::ValOwned: Columnation + ExchangeData,
Tr2: Trace + TraceReader<Time = G::Timestamp, Diff = Diff> + 'static,
Tr2::Batch: Batch,
Tr2::Batcher: Batcher<Item = ((Tr1::KeyOwned, Tr1::ValOwned), G::Timestamp, Diff)>,
Arranged<G, TraceAgent<Tr2>>: ArrangementSize,
fn rearrange_iterative<Tr1, Tr2>( &self, oks: Arranged<Child<'g, G, T>, TraceAgent<Tr1>>, name: &str ) -> Arranged<G, TraceAgent<Tr2>>where Tr1: TraceReader<Time = T, Diff = Diff>, Tr1::KeyOwned: Columnation + ExchangeData + Hashable, Tr1::ValOwned: Columnation + ExchangeData, Tr2: Trace + TraceReader<Time = G::Timestamp, Diff = Diff> + 'static, Tr2::Batch: Batch, Tr2::Batcher: Batcher<Item = ((Tr1::KeyOwned, Tr1::ValOwned), G::Timestamp, Diff)>, Arranged<G, TraceAgent<Tr2>>: ArrangementSize,
Rearranges an arrangement coming from an iterative scope into an arrangement in the outer timestamp scope.
source§impl<G> Context<G>where
G: Scope<Timestamp = Product<Timestamp, PointStamp<u64>>>,
impl<G> Context<G>where G: Scope<Timestamp = Product<Timestamp, PointStamp<u64>>>,
sourcepub fn render_recursive_plan(
&mut self,
level: usize,
plan: Plan
) -> CollectionBundle<G>
pub fn render_recursive_plan( &mut self, level: usize, plan: Plan ) -> CollectionBundle<G>
Renders a plan to a differential dataflow, producing the collection of results.
This method allows for plan
to contain a LetRec
variant at its root, and is planned
in the context of level
pre-existing iteration coordinates.
This method recursively descends LetRec
nodes, establishing nested scopes for each
and establishing the appropriate recursive dependencies among the bound variables.
Once non-LetRec
nodes are reached it calls in to render_plan
which will error if
further LetRec
variants are found.
The method requires that all variables conclude with a physical representation that contains a collection (i.e. a non-arrangement), and it will panic otherwise.
source§impl<G> Context<G>where
G: Scope,
G::Timestamp: RenderTimestamp,
impl<G> Context<G>where G: Scope, G::Timestamp: RenderTimestamp,
sourcepub fn render_plan(&mut self, plan: Plan) -> CollectionBundle<G>
pub fn render_plan(&mut self, plan: Plan) -> CollectionBundle<G>
Renders a plan to a differential dataflow, producing the collection of results.
The return type reflects the uncertainty about the data representation, perhaps as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
fn log_operator_hydration(&self, bundle: &mut CollectionBundle<G>, lir_id: u64)
fn log_operator_hydration_inner<D>( &self, stream: &Stream<G, D>, lir_id: u64 ) -> Stream<G, D>where D: Clone + 'static,
Auto Trait Implementations§
impl<S, T = Timestamp> !RefUnwindSafe for Context<S, T>
impl<S, T = Timestamp> !Send for Context<S, T>
impl<S, T = Timestamp> !Sync for Context<S, T>
impl<S, T> Unpin for Context<S, T>where S: Unpin, T: Unpin,
impl<S, T = Timestamp> !UnwindSafe for Context<S, T>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> FmtForward for T
impl<T> FmtForward for T
source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where Self: Display,
self
to use its Display
implementation when
Debug
-formatted.source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.source§impl<T> FutureExt for T
impl<T> FutureExt for T
source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere T: ?Sized,
source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere Self: Sized,
source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere R: 'a,
self
and passes that borrow into the pipe function. Read moresource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere
Self: Borrow<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> Rwhere Self: Borrow<B>, B: 'a + ?Sized, R: 'a,
source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R
) -> Rwhere
Self: BorrowMut<B>,
B: 'a + ?Sized,
R: 'a,
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R ) -> Rwhere Self: BorrowMut<B>, B: 'a + ?Sized, R: 'a,
source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere
Self: AsRef<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> Rwhere Self: AsRef<U>, U: 'a + ?Sized, R: 'a,
self
, then passes self.as_ref()
into the pipe function.source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere
Self: AsMut<U>,
U: 'a + ?Sized,
R: 'a,
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> Rwhere Self: AsMut<U>, U: 'a + ?Sized, R: 'a,
self
, then passes self.as_mut()
into the pipe
function.source§impl<T> Pointable for T
impl<T> Pointable for T
source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere R: RustType<P>,
source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.source§impl<T> Tap for T
impl<T> Tap for T
source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,
Borrow<B>
of a value. Read moresource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,
BorrowMut<B>
of a value. Read moresource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,
AsRef<R>
view of a value. Read moresource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,
AsMut<R>
view of a value. Read moresource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere
Self: Deref<Target = T>,
T: ?Sized,
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Selfwhere Self: Deref<Target = T>, T: ?Sized,
Deref::Target
of a value. Read moresource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere
Self: DerefMut<Target = T> + Deref,
T: ?Sized,
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Selfwhere Self: DerefMut<Target = T> + Deref, T: ?Sized,
Deref::Target
of a value. Read moresource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere
Self: Borrow<B>,
B: ?Sized,
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Selfwhere Self: Borrow<B>, B: ?Sized,
.tap_borrow()
only in debug builds, and is erased in release
builds.source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere
Self: BorrowMut<B>,
B: ?Sized,
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Selfwhere Self: BorrowMut<B>, B: ?Sized,
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere
Self: AsRef<R>,
R: ?Sized,
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Selfwhere Self: AsRef<R>, R: ?Sized,
.tap_ref()
only in debug builds, and is erased in release
builds.source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere
Self: AsMut<R>,
R: ?Sized,
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Selfwhere Self: AsMut<R>, R: ?Sized,
.tap_ref_mut()
only in debug builds, and is erased in release
builds.