pub struct Context<S: Scope, T = Timestamp>{
pub(crate) scope: S,
pub debug_name: String,
pub dataflow_id: usize,
pub as_of_frontier: Antichain<T>,
pub until: Antichain<T>,
pub bindings: BTreeMap<Id, CollectionBundle<S, T>>,
pub(super) shutdown_token: ShutdownToken,
pub(super) hydration_logger: Option<HydrationLogger>,
pub(super) compute_logger: Option<Logger<ComputeEvent>>,
pub(super) linear_join_spec: LinearJoinSpec,
pub dataflow_expiration: Antichain<T>,
}
Expand description
Dataflow-local collections and arrangements.
A context means to wrap available data assets and present them in an easy-to-use manner. These assets include dataflow-local collections and arrangements, as well as imported arrangements from outside the dataflow.
Context has two timestamp types, one from S::Timestamp
and one from T
, where the
former must refine the latter. The former is the timestamp used by the scope in question,
and the latter is the timestamp of imported traces. The two may be different in the case
of regions or iteration.
Fields§
§scope: S
The scope within which all managed collections exist.
It is an error to add any collections not contained in this scope.
debug_name: String
The debug name of the dataflow associated with this context.
dataflow_id: usize
The Timely ID of the dataflow associated with this context.
as_of_frontier: Antichain<T>
Frontier before which updates should not be emitted.
We must apply it to sinks, to ensure correct outputs. We should apply it to sources and imported traces, because it improves performance.
until: Antichain<T>
Frontier after which updates should not be emitted. Used to limit the amount of work done when appropriate.
bindings: BTreeMap<Id, CollectionBundle<S, T>>
Bindings of identifiers to collections.
shutdown_token: ShutdownToken
A token that operators can probe to know whether the dataflow is shutting down.
hydration_logger: Option<HydrationLogger>
A logger that operators can use to report hydration events.
None
if no hydration events should be logged in this context.
compute_logger: Option<Logger<ComputeEvent>>
The logger, from Timely’s logging framework, if logs are enabled.
linear_join_spec: LinearJoinSpec
Specification for rendering linear joins.
dataflow_expiration: Antichain<T>
The expiration time for dataflows in this context. The output’s frontier should never advance past this frontier, except the empty frontier.
Implementations§
Source§impl<S: Scope> Context<S>
impl<S: Scope> Context<S>
Sourcepub fn for_dataflow_in<Plan>(
dataflow: &DataflowDescription<Plan, CollectionMetadata>,
scope: S,
compute_state: &ComputeState,
until: Antichain<Timestamp>,
dataflow_expiration: Antichain<Timestamp>,
) -> Self
pub fn for_dataflow_in<Plan>( dataflow: &DataflowDescription<Plan, CollectionMetadata>, scope: S, compute_state: &ComputeState, until: Antichain<Timestamp>, dataflow_expiration: Antichain<Timestamp>, ) -> Self
Creates a new empty Context.
Source§impl<S: Scope, T> Context<S, T>
impl<S: Scope, T> Context<S, T>
Sourcepub fn insert_id(
&mut self,
id: Id,
collection: CollectionBundle<S, T>,
) -> Option<CollectionBundle<S, T>>
pub fn insert_id( &mut self, id: Id, collection: CollectionBundle<S, T>, ) -> Option<CollectionBundle<S, T>>
Insert a collection bundle by an identifier.
This is expected to be used to install external collections (sources, indexes, other views),
as well as for Let
bindings of local collections.
Sourcepub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>>
pub fn remove_id(&mut self, id: Id) -> Option<CollectionBundle<S, T>>
Remove a collection bundle by an identifier.
The primary use of this method is uninstalling Let
bindings.
Sourcepub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>)
pub fn update_id(&mut self, id: Id, collection: CollectionBundle<S, T>)
Melds a collection bundle to whatever exists.
Sourcepub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>>
pub fn lookup_id(&self, id: Id) -> Option<CollectionBundle<S, T>>
Look up a collection bundle by an identifier.
pub(super) fn error_logger(&self) -> ErrorLogger
Source§impl<G> Context<G>
impl<G> Context<G>
Sourcepub fn render_flat_map(
&self,
input: CollectionBundle<G>,
func: TableFunc,
exprs: Vec<MirScalarExpr>,
mfp: MapFilterProject,
input_key: Option<Vec<MirScalarExpr>>,
) -> CollectionBundle<G>
pub fn render_flat_map( &self, input: CollectionBundle<G>, func: TableFunc, exprs: Vec<MirScalarExpr>, mfp: MapFilterProject, input_key: Option<Vec<MirScalarExpr>>, ) -> CollectionBundle<G>
Applies a TableFunc
to every row, followed by an mfp
.
Source§impl<G> Context<G>
impl<G> Context<G>
Sourcepub fn render_delta_join(
&self,
inputs: Vec<CollectionBundle<G>>,
join_plan: DeltaJoinPlan,
) -> CollectionBundle<G>
pub fn render_delta_join( &self, inputs: Vec<CollectionBundle<G>>, join_plan: DeltaJoinPlan, ) -> CollectionBundle<G>
Renders MirRelationExpr:Join
using dogs^3 delta query dataflows.
The join is followed by the application of map_filter_project
, whose
implementation will be pushed in to the join pipeline if at all possible.
Source§impl<G, T> Context<G, T>where
G: Scope,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
impl<G, T> Context<G, T>where
G: Scope,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
pub(crate) fn render_join( &self, inputs: Vec<CollectionBundle<G, T>>, linear_plan: LinearJoinPlan, ) -> CollectionBundle<G, T>
fn render_join_inner( &self, inputs: Vec<CollectionBundle<G, T>>, linear_plan: LinearJoinPlan, inner: &mut Child<'_, G, <G as ScopeParent>::Timestamp>, ) -> CollectionBundle<G, T>
Sourcefn differential_join<S>(
&self,
joined: JoinedFlavor<S, T>,
lookup_relation: CollectionBundle<S, T>,
_: LinearStagePlan,
errors: &mut Vec<Collection<S, DataflowError, Diff>>,
) -> Collection<S, Row, Diff>
fn differential_join<S>( &self, joined: JoinedFlavor<S, T>, lookup_relation: CollectionBundle<S, T>, _: LinearStagePlan, errors: &mut Vec<Collection<S, DataflowError, Diff>>, ) -> Collection<S, Row, Diff>
Looks up the arrangement for the next input and joins it to the arranged version of the join of previous inputs.
Sourcefn differential_join_inner<S, Tr1, Tr2>(
&self,
prev_keyed: Arranged<S, Tr1>,
next_input: Arranged<S, Tr2>,
closure: JoinClosure,
) -> (Collection<S, Row, Diff>, Option<Collection<S, DataflowError, Diff>>)where
S: Scope<Timestamp = G::Timestamp>,
Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
for<'a> Tr1::Key<'a>: ToDatumIter,
for<'a> Tr1::Val<'a>: ToDatumIter,
for<'a> Tr2::Val<'a>: ToDatumIter,
fn differential_join_inner<S, Tr1, Tr2>(
&self,
prev_keyed: Arranged<S, Tr1>,
next_input: Arranged<S, Tr2>,
closure: JoinClosure,
) -> (Collection<S, Row, Diff>, Option<Collection<S, DataflowError, Diff>>)where
S: Scope<Timestamp = G::Timestamp>,
Tr1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
Tr2: for<'a> TraceReader<Key<'a> = Tr1::Key<'a>, Time = G::Timestamp, Diff = Diff> + Clone + 'static,
for<'a> Tr1::Key<'a>: ToDatumIter,
for<'a> Tr1::Val<'a>: ToDatumIter,
for<'a> Tr2::Val<'a>: ToDatumIter,
Joins the arrangement for next_input
to the arranged version of the
join of previous inputs. This is split into its own method to enable
reuse of code with different types of next_input
.
The return type includes an optional error collection, which may be
None
if we can determine that closure
cannot error.
Source§impl<G, T> Context<G, T>where
G: Scope,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
impl<G, T> Context<G, T>where
G: Scope,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
Sourcepub fn render_reduce(
&self,
input: CollectionBundle<G, T>,
key_val_plan: KeyValPlan,
reduce_plan: ReducePlan,
input_key: Option<Vec<MirScalarExpr>>,
mfp_after: Option<MapFilterProject>,
) -> CollectionBundle<G, T>
pub fn render_reduce( &self, input: CollectionBundle<G, T>, key_val_plan: KeyValPlan, reduce_plan: ReducePlan, input_key: Option<Vec<MirScalarExpr>>, mfp_after: Option<MapFilterProject>, ) -> CollectionBundle<G, T>
Renders a MirRelationExpr::Reduce
using various non-obvious techniques to
minimize worst-case incremental update times and memory footprint.
Sourcefn render_reduce_plan<S>(
&self,
plan: ReducePlan,
collection: Collection<S, (Row, Row), Diff>,
err_input: Collection<S, DataflowError, Diff>,
key_arity: usize,
mfp_after: Option<SafeMfpPlan>,
) -> CollectionBundle<S, T>
fn render_reduce_plan<S>( &self, plan: ReducePlan, collection: Collection<S, (Row, Row), Diff>, err_input: Collection<S, DataflowError, Diff>, key_arity: usize, mfp_after: Option<SafeMfpPlan>, ) -> CollectionBundle<S, T>
Render a dataflow based on the provided plan.
The output will be an arrangements that looks the same as if we just had a single reduce operator computing everything together, and this arrangement can also be re-used.
fn render_reduce_plan_inner<S>( &self, plan: ReducePlan, collection: Collection<S, (Row, Row), Diff>, errors: &mut Vec<Collection<S, DataflowError, Diff>>, key_arity: usize, mfp_after: Option<SafeMfpPlan>, ) -> MzArrangement<S>
Sourcefn build_collation<S>(
&self,
arrangements: Vec<(ReductionType, Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>)>,
aggregate_types: Vec<ReductionType>,
scope: &mut S,
mfp_after: Option<SafeMfpPlan>,
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
fn build_collation<S>( &self, arrangements: Vec<(ReductionType, Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>)>, aggregate_types: Vec<ReductionType>, scope: &mut S, mfp_after: Option<SafeMfpPlan>, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
Build the dataflow to combine arrangements containing results of different aggregation types into a single arrangement.
This computes the same thing as a join on the group key followed by shuffling the values into the correct order. This implementation assumes that all input arrangements present values in a way that respects the desired output order, so we can do a linear merge to form the output.
fn dispatch_build_distinct<S>( &self, collection: Collection<S, (Row, Row), Diff>, mfp_after: Option<SafeMfpPlan>, ) -> (MzArrangement<S>, Collection<S, DataflowError, Diff>)
Sourcefn build_distinct<Ba1, Bu1, T1, Bu2, T2, S>(
&self,
collection: Collection<S, (Row, Row), Diff>,
tag: &str,
mfp_after: Option<SafeMfpPlan>,
) -> (Arranged<S, TraceAgent<T1>>, Collection<S, DataflowError, Diff>)where
S: Scope<Timestamp = G::Timestamp>,
for<'a> T1: Trace<Val<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff> + 'static,
for<'a, 'a> T1::Key<'a>: IntoOwned<'a, Owned = Row> + Debug + ToDatumIter,
T1::Batch: Batch,
Ba1: Batcher<Input = Vec<((Row, Row), G::Timestamp, Diff)>, Time = G::Timestamp> + 'static,
Ba1::Output: Container + PushInto<((Row, Row), T1::Time, T1::Diff)>,
Bu1: Builder<Time = G::Timestamp, Input = Ba1::Output, Output = T1::Batch>,
Arranged<S, TraceAgent<T1>>: ArrangementSize,
T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Val<'a> = &'a DataflowError, Time = G::Timestamp, Diff = Diff> + 'static,
T2::Batch: Batch,
Bu2: Builder<Time = G::Timestamp, Output = T2::Batch>,
Bu2::Input: Container + PushInto<((Row, DataflowError), T2::Time, T2::Diff)>,
Arranged<S, TraceAgent<T2>>: ArrangementSize,
fn build_distinct<Ba1, Bu1, T1, Bu2, T2, S>(
&self,
collection: Collection<S, (Row, Row), Diff>,
tag: &str,
mfp_after: Option<SafeMfpPlan>,
) -> (Arranged<S, TraceAgent<T1>>, Collection<S, DataflowError, Diff>)where
S: Scope<Timestamp = G::Timestamp>,
for<'a> T1: Trace<Val<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff> + 'static,
for<'a, 'a> T1::Key<'a>: IntoOwned<'a, Owned = Row> + Debug + ToDatumIter,
T1::Batch: Batch,
Ba1: Batcher<Input = Vec<((Row, Row), G::Timestamp, Diff)>, Time = G::Timestamp> + 'static,
Ba1::Output: Container + PushInto<((Row, Row), T1::Time, T1::Diff)>,
Bu1: Builder<Time = G::Timestamp, Input = Ba1::Output, Output = T1::Batch>,
Arranged<S, TraceAgent<T1>>: ArrangementSize,
T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Val<'a> = &'a DataflowError, Time = G::Timestamp, Diff = Diff> + 'static,
T2::Batch: Batch,
Bu2: Builder<Time = G::Timestamp, Output = T2::Batch>,
Bu2::Input: Container + PushInto<((Row, DataflowError), T2::Time, T2::Diff)>,
Arranged<S, TraceAgent<T2>>: ArrangementSize,
Build the dataflow to compute the set of distinct keys.
Sourcefn build_basic_aggregates<S>(
&self,
input: Collection<S, (Row, Row), Diff>,
aggrs: Vec<(usize, AggregateExpr)>,
key_arity: usize,
mfp_after: Option<SafeMfpPlan>,
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
fn build_basic_aggregates<S>( &self, input: Collection<S, (Row, Row), Diff>, aggrs: Vec<(usize, AggregateExpr)>, key_arity: usize, mfp_after: Option<SafeMfpPlan>, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
Build the dataflow to compute and arrange multiple non-accumulable,
non-hierarchical aggregations on input
.
This function assumes that we are explicitly rendering multiple basic aggregations.
For each aggregate, we render a different reduce operator, and then fuse
results together into a final arrangement that presents all the results
in the order specified by aggrs
.
Sourcefn build_basic_aggregate<S>(
&self,
input: Collection<S, (Row, Row), Diff>,
index: usize,
aggr: &AggregateExpr,
validating: bool,
key_arity: usize,
mfp_after: Option<SafeMfpPlan>,
fused_unnest_list: bool,
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Option<Collection<S, DataflowError, Diff>>)
fn build_basic_aggregate<S>( &self, input: Collection<S, (Row, Row), Diff>, index: usize, aggr: &AggregateExpr, validating: bool, key_arity: usize, mfp_after: Option<SafeMfpPlan>, fused_unnest_list: bool, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Option<Collection<S, DataflowError, Diff>>)
Build the dataflow to compute a single basic aggregation.
This method also applies distinctness if required.
fn build_reduce_inaccumulable_distinct<S, V, Bu, Tr>(
&self,
input: Collection<S, Row, Diff>,
name_tag: Option<&str>,
) -> Arranged<S, TraceAgent<Tr>>where
S: Scope<Timestamp = G::Timestamp>,
V: MaybeValidatingRow<(), String>,
Tr: Trace + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff> + 'static,
Tr::Batch: Batch,
Bu: Builder<Time = G::Timestamp, Output = Tr::Batch>,
Bu::Input: Container + PushInto<((Row, V), Tr::Time, Tr::Diff)>,
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = Row>,
for<'a> Tr::Val<'a>: IntoOwned<'a, Owned = V>,
Arranged<S, TraceAgent<Tr>>: ArrangementSize,
Sourcefn build_bucketed<S>(
&self,
input: Collection<S, (Row, Row), Diff>,
_: BucketedPlan,
key_arity: usize,
mfp_after: Option<SafeMfpPlan>,
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
fn build_bucketed<S>( &self, input: Collection<S, (Row, Row), Diff>, _: BucketedPlan, key_arity: usize, mfp_after: Option<SafeMfpPlan>, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
Build the dataflow to compute and arrange multiple hierarchical aggregations on non-monotonic inputs.
This function renders a single reduction tree that computes aggregations with a priority queue implemented with a series of reduce operators that partition the input into buckets, and compute the aggregation over very small buckets and feed the results up to larger buckets.
Note that this implementation currently ignores the distinct bit because we currently only perform min / max hierarchically and the reduction tree efficiently suppresses non-distinct updates.
buckets
indicates the number of buckets in this stage. We do some non-obvious
trickery here to limit the memory usage per layer by internally
holding only the elements that were rejected by this stage. However, the
output collection maintains the ((key, bucket), (passing value)
for this
stage.
Sourcefn build_bucketed_stage<S>(
&self,
aggr_funcs: &Vec<AggregateFunc>,
input: &Collection<S, (Row, Row), Diff>,
validating: bool,
) -> (Collection<S, (Row, Row), Diff>, Option<Collection<S, DataflowError, Diff>>)
fn build_bucketed_stage<S>( &self, aggr_funcs: &Vec<AggregateFunc>, input: &Collection<S, (Row, Row), Diff>, validating: bool, ) -> (Collection<S, (Row, Row), Diff>, Option<Collection<S, DataflowError, Diff>>)
Build a bucketed stage fragment that wraps Self::build_bucketed_negated_output
, and
adds validation if validating
is true. It returns the consolidated inputs concatenated
with the negation of what’s produced by the reduction.
validating
indicates whether we want this stage to perform error detection
for invalid accumulations. Once a stage is clean of such errors, subsequent
stages can skip validation.
Sourcefn build_bucketed_negated_output<S, V, Bu, Tr>(
&self,
input: &Collection<S, (Row, Row), Diff>,
aggrs: Vec<AggregateFunc>,
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), G::Timestamp, Diff)>>>>>>, Arranged<S, TraceAgent<Tr>>)where
S: Scope<Timestamp = G::Timestamp>,
V: MaybeValidatingRow<Row, Row>,
Tr: Trace + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff> + 'static,
Tr::Batch: Batch,
Bu: Builder<Time = G::Timestamp, Output = Tr::Batch>,
Bu::Input: Container + PushInto<((Row, V), Tr::Time, Tr::Diff)>,
for<'a> Tr::Val<'a>: IntoOwned<'a, Owned = V>,
Arranged<S, TraceAgent<Tr>>: ArrangementSize,
fn build_bucketed_negated_output<S, V, Bu, Tr>(
&self,
input: &Collection<S, (Row, Row), Diff>,
aggrs: Vec<AggregateFunc>,
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), G::Timestamp, Diff)>>>>>>, Arranged<S, TraceAgent<Tr>>)where
S: Scope<Timestamp = G::Timestamp>,
V: MaybeValidatingRow<Row, Row>,
Tr: Trace + for<'a> TraceReader<Key<'a> = DatumSeq<'a>, Time = G::Timestamp, Diff = Diff> + 'static,
Tr::Batch: Batch,
Bu: Builder<Time = G::Timestamp, Output = Tr::Batch>,
Bu::Input: Container + PushInto<((Row, V), Tr::Time, Tr::Diff)>,
for<'a> Tr::Val<'a>: IntoOwned<'a, Owned = V>,
Arranged<S, TraceAgent<Tr>>: ArrangementSize,
Build a dataflow fragment for one stage of a reduction tree for multiple hierarchical aggregates to arrange and reduce the inputs. Returns the arranged input and the reduction, with all diffs in the reduction’s output negated.
Sourcefn build_monotonic<S>(
&self,
collection: Collection<S, (Row, Row), Diff>,
_: MonotonicPlan,
mfp_after: Option<SafeMfpPlan>,
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
fn build_monotonic<S>( &self, collection: Collection<S, (Row, Row), Diff>, _: MonotonicPlan, mfp_after: Option<SafeMfpPlan>, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
Build the dataflow to compute and arrange multiple hierarchical aggregations on monotonic inputs.
Sourcefn build_accumulable<S>(
&self,
collection: Collection<S, (Row, Row), Diff>,
_: AccumulablePlan,
key_arity: usize,
mfp_after: Option<SafeMfpPlan>,
) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
fn build_accumulable<S>( &self, collection: Collection<S, (Row, Row), Diff>, _: AccumulablePlan, key_arity: usize, mfp_after: Option<SafeMfpPlan>, ) -> (Arranged<S, TraceAgent<Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), <S as ScopeParent>::Timestamp, Diff)>>>>>>, Collection<S, DataflowError, Diff>)
Build the dataflow to compute and arrange multiple accumulable aggregations.
The incoming values are moved to the update’s “difference” field, at which point
they can be accumulated in place. The count
operator promotes the accumulated
values to data, at which point a final map applies operator-specific logic to
yield the final aggregate.
Source§impl<'g, G, T> Context<Child<'g, G, T>>
impl<'g, G, T> Context<Child<'g, G, T>>
Sourcepub(crate) fn export_sink(
&self,
compute_state: &mut ComputeState,
tokens: &BTreeMap<GlobalId, Rc<dyn Any>>,
dependency_ids: BTreeSet<GlobalId>,
sink_id: GlobalId,
sink: &ComputeSinkDesc<CollectionMetadata>,
start_signal: StartSignal,
ct_times: Option<Collection<G, (), Diff>>,
)
pub(crate) fn export_sink( &self, compute_state: &mut ComputeState, tokens: &BTreeMap<GlobalId, Rc<dyn Any>>, dependency_ids: BTreeSet<GlobalId>, sink_id: GlobalId, sink: &ComputeSinkDesc<CollectionMetadata>, start_signal: StartSignal, ct_times: Option<Collection<G, (), Diff>>, )
Export the sink described by sink
from the rendering context.
Source§impl<G, T> Context<G, T>where
G: Scope,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
impl<G, T> Context<G, T>where
G: Scope,
G::Timestamp: Lattice + Refines<T> + Columnation,
T: Timestamp + Lattice + Columnation,
pub(crate) fn render_threshold( &self, input: CollectionBundle<G, T>, threshold_plan: ThresholdPlan, ) -> CollectionBundle<G, T>
Source§impl<G> Context<G>
impl<G> Context<G>
pub(crate) fn render_topk( &self, input: CollectionBundle<G>, top_k_plan: TopKPlan, ) -> CollectionBundle<G>
Sourcefn build_topk<S>(
&self,
collection: Collection<S, Row, Diff>,
group_key: Vec<usize>,
order_key: Vec<ColumnOrder>,
offset: usize,
limit: Option<MirScalarExpr>,
arity: usize,
buckets: Vec<u64>,
) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
fn build_topk<S>( &self, collection: Collection<S, Row, Diff>, group_key: Vec<usize>, order_key: Vec<ColumnOrder>, offset: usize, limit: Option<MirScalarExpr>, arity: usize, buckets: Vec<u64>, ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
Constructs a TopK dataflow subgraph.
Sourcefn build_topk_stage<S>(
&self,
collection: Collection<S, (Row, Row), Diff>,
order_key: Vec<ColumnOrder>,
modulus: u64,
offset: usize,
limit: Option<MirScalarExpr>,
arity: usize,
validating: bool,
) -> (Collection<S, (Row, Row), Diff>, Option<Collection<S, DataflowError, Diff>>)
fn build_topk_stage<S>( &self, collection: Collection<S, (Row, Row), Diff>, order_key: Vec<ColumnOrder>, modulus: u64, offset: usize, limit: Option<MirScalarExpr>, arity: usize, validating: bool, ) -> (Collection<S, (Row, Row), Diff>, Option<Collection<S, DataflowError, Diff>>)
To provide a robust incremental orderby-limit experience, we want to avoid grouping all records (or even large groups) and then applying the ordering and limit. Instead, a more robust approach forms groups of bounded size and applies the offset and limit to each, and then increases the sizes of the groups.
Builds a “stage”, which uses a finer grouping than is required to reduce the volume of updates, and to reduce the amount of work on the critical path for updates. The cost is a larger number of arrangements when this optimization does nothing beneficial.
The function accepts a collection of the form (hash_key, row)
, a modulus it applies to the
hash_key
’s hash datum, an offset
for returning results, and a limit
to restrict the
output size. arity
represents the number of columns in the input data, and
if validating
is true, we check for negative multiplicities, which indicate
an error in the input data.
The output of this function is not consolidated.
The dataflow fragment has the following shape:
| input
|
arrange
|\
| \
| reduce
| |
concat
|
| output
There are additional map/flat_map operators as well as error demuxing operators, but we’re omitting them here for the sake of simplicity.
fn render_top1_monotonic<S>( &self, collection: Collection<S, Row, Diff>, group_key: Vec<usize>, order_key: Vec<ColumnOrder>, must_consolidate: bool, ) -> (Collection<S, Row, Diff>, Collection<S, DataflowError, Diff>)
Source§impl<'g, G, T> Context<Child<'g, G, T>>
impl<'g, G, T> Context<Child<'g, G, T>>
pub(crate) fn import_index( &mut self, compute_state: &mut ComputeState, tokens: &mut BTreeMap<GlobalId, Rc<dyn Any>>, input_probe: Handle<Timestamp>, idx_id: GlobalId, idx: &IndexDesc, start_signal: StartSignal, )
Source§impl<'g, G, T> Context<Child<'g, G, T>>
impl<'g, G, T> Context<Child<'g, G, T>>
pub(crate) fn export_index_iterative( &self, compute_state: &mut ComputeState, tokens: &BTreeMap<GlobalId, Rc<dyn Any>>, dependency_ids: BTreeSet<GlobalId>, idx_id: GlobalId, idx: &IndexDesc, )
Sourcefn dispatch_rearrange_iterative(
&self,
oks: MzArrangement<Child<'g, G, T>>,
name: &str,
) -> MzArrangement<G>
fn dispatch_rearrange_iterative( &self, oks: MzArrangement<Child<'g, G, T>>, name: &str, ) -> MzArrangement<G>
Dispatches the rearranging of an arrangement coming from an iterative scope according to specialized key-value arrangement types.
Source§impl<G> Context<G>
impl<G> Context<G>
Sourcepub fn render_recursive_plan(
&mut self,
object_id: GlobalId,
level: usize,
plan: RenderPlan,
) -> CollectionBundle<G>
pub fn render_recursive_plan( &mut self, object_id: GlobalId, level: usize, plan: RenderPlan, ) -> CollectionBundle<G>
Renders a plan to a differential dataflow, producing the collection of results.
This method allows for plan
to contain RecBind
s, and is planned
in the context of level
pre-existing iteration coordinates.
This method recursively descends RecBind
values, establishing nested scopes for each
and establishing the appropriate recursive dependencies among the bound variables.
Once all RecBind
s have been rendered it calls in to render_plan
which will error if
further RecBind
s are found.
The method requires that all variables conclude with a physical representation that contains a collection (i.e. a non-arrangement), and it will panic otherwise.
Source§impl<G> Context<G>
impl<G> Context<G>
Sourcepub fn render_plan(
&mut self,
object_id: GlobalId,
plan: RenderPlan,
) -> CollectionBundle<G>
pub fn render_plan( &mut self, object_id: GlobalId, plan: RenderPlan, ) -> CollectionBundle<G>
Renders a non-recursive plan to a differential dataflow, producing the collection of results.
The return type reflects the uncertainty about the data representation, perhaps as a stream of data, perhaps as an arrangement, perhaps as a stream of batches.
§Panics
Panics if the given plan contains any RecBind
s. Recursive plans must be rendered using
render_recursive_plan
instead.
Sourcefn render_letfree_plan(
&mut self,
object_id: GlobalId,
plan: LetFreePlan,
) -> CollectionBundle<G>
fn render_letfree_plan( &mut self, object_id: GlobalId, plan: LetFreePlan, ) -> CollectionBundle<G>
Renders a let-free plan to a differential dataflow, producing the collection of results.
Sourcefn render_plan_expr(
&mut self,
expr: Expr,
collections: &BTreeMap<LirId, CollectionBundle<G>>,
) -> CollectionBundle<G>
fn render_plan_expr( &mut self, expr: Expr, collections: &BTreeMap<LirId, CollectionBundle<G>>, ) -> CollectionBundle<G>
Renders a render_plan::Expr
, producing the collection of results.
§Panics
Panics if any of the expr’s inputs is not found in collections
.
Callers must ensure that input nodes have been rendered previously.
fn log_dataflow_global_id(&self, id: usize, global_id: GlobalId)
fn log_lir_mapping( &self, global_id: GlobalId, mapping: Box<[(LirId, LirMetadata)]>, )
fn log_operator_hydration( &self, bundle: &mut CollectionBundle<G>, lir_id: LirId, )
fn log_operator_hydration_inner<D>(
&self,
stream: &Stream<G, D>,
lir_id: LirId,
) -> Stream<G, D>where
D: Clone + 'static,
Auto Trait Implementations§
impl<S, T> Freeze for Context<S, T>
impl<S, T = Timestamp> !RefUnwindSafe for Context<S, T>
impl<S, T = Timestamp> !Send for Context<S, T>
impl<S, T = Timestamp> !Sync for Context<S, T>
impl<S, T> Unpin for Context<S, T>
impl<S, T = Timestamp> !UnwindSafe for Context<S, T>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FmtForward for T
impl<T> FmtForward for T
Source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.Source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.Source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.Source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.Source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.Source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.Source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.Source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
Source§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
Source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
Source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
Source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moreSource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read moreSource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
Source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
Source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.Source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.Source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
Source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto
.Source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto
.Source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
Source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.Source§impl<T> Tap for T
impl<T> Tap for T
Source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read moreSource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read moreSource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read moreSource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read moreSource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read moreSource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read moreSource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.Source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.Source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.Source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.Source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.Source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.Source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.