pub struct DataflowBuilder { /* private fields */ }Expand description
Builds a compute dataflow from generic parts, hiding the lowering and persist wiring mechanism.
§Contract
By default the caller supplies MIR and the builder lowers it faithfully,
attaching the persist wiring without optimizing — so a hand-built minimal plan
lowers exactly as written. Optimization — fusion, predicate pushdown, and notably
join-implementation selection — is opt-in via Self::optimize, paid for only
by callers that need it. A Join whose implementation is left Unimplemented
is rejected by the LIR lowering, so a plan containing one requires optimize,
which runs mz_transform::optimize_dataflow to fill the implementation first.
When optimizing, the builder hands the optimizer an index oracle built from its
own index_imports (ImportedIndexOracle), so imported arrangements are
recognized — the same index information environmentd’s catalog oracle would
supply for these imports.
§Construction strategy
The builder deliberately does not hand-roll the RenderPlan: the LirIds
used to stitch nodes together have no public constructor, and the LetFreePlan
invariants (notably a valid topological_order) are easy to get wrong. Instead
it mirrors exactly what the real compute controller does:
- Accumulate a MIR-level
DataflowDescription<OptimizedMirRelationExpr, ()>using the sameimport_source/insert_plan/export_indexhelpers the optimizer uses. - Lower it to LIR via
Plan::finalize_dataflow, yieldingDataflowDescription<Plan, ()>. - Augment it into
DataflowDescription<RenderPlan, CollectionMetadata>by converting each object’sPlanviaRenderPlan::try_fromand attaching the storageCollectionMetadatato each source import — the same step performed incompute-client’sInstance::create_dataflow.
This guarantees the emitted plan is structurally identical to one produced by a
live environmentd, at the cost of running the (cheap, deterministic) lowering
in-process.
Implementations§
Source§impl DataflowBuilder
impl DataflowBuilder
Sourcepub fn new(name: impl Into<String>) -> Self
pub fn new(name: impl Into<String>) -> Self
Start an empty builder. name becomes the dataflow’s debug name.
Sourcepub fn import_persist(&mut self, id: GlobalId, source: PersistSource) -> Input
pub fn import_persist(&mut self, id: GlobalId, source: PersistSource) -> Input
Import a persist-backed storage collection as id.
Registers the source on the MIR description and records the persist metadata
for the augment step. Returns an Input handle whose Input::get yields
a correctly typed Get node, so callers never construct a ReprRelationType
by hand.
Sourcepub fn import_index(
&mut self,
index_id: GlobalId,
on_id: GlobalId,
key_cols: Vec<usize>,
on_type: ReprRelationType,
monotonic: bool,
) -> Input
pub fn import_index( &mut self, index_id: GlobalId, on_id: GlobalId, key_cols: Vec<usize>, on_type: ReprRelationType, monotonic: bool, ) -> Input
Import a previously-exported index, making the collection it arranges
(on_id) available to this dataflow as an in-memory arrangement.
Unlike Self::import_persist, this imports no storage collection: the
arrangement is served from the replica’s existing, hydrated index, so the
dataflow needs no CollectionMetadata and the augment step leaves the
index import untouched. The MIR-to-LIR lowering registers the imported
arrangement under Get(on_id) automatically, so a faithful (unoptimized)
Get(on_id) picks it up. Returns an Input referencing on_id — the
id a computation Gets, not the index id itself.
Sourcepub fn get(&self, id: GlobalId) -> Result<MirRelationExpr>
pub fn get(&self, id: GlobalId) -> Result<MirRelationExpr>
A typed Get of an already-imported or built id, for callers that
assemble MIR by id rather than threading Input handles — notably the
JSON MIR translator. Errors if id was never imported or built, so a bad
reference surfaces cleanly instead of constructing an ill-typed Get.
Sourcepub fn build(&mut self, id: GlobalId, expr: MirRelationExpr) -> &mut Self
pub fn build(&mut self, id: GlobalId, expr: MirRelationExpr) -> &mut Self
Insert a MIR object to compute, bound to id.
expr is wrapped via OptimizedMirRelationExpr::declare_optimized; the
caller is responsible for any optimization (see the type-level contract). The
object’s relation type is recorded so a later Self::export_index over id
can derive its on_type.
Sourcepub fn export_index(
&mut self,
index_id: GlobalId,
on_id: GlobalId,
key_cols: Vec<usize>,
) -> &mut Self
pub fn export_index( &mut self, index_id: GlobalId, on_id: GlobalId, key_cols: Vec<usize>, ) -> &mut Self
Export an index index_id arranging on_id by key_cols.
on_id may be an imported source or a built object; either way the lowering
synthesizes the ArrangeBy. The on_type is derived from the referenced id,
which must have been imported or built first.
Sourcepub fn export_materialized_view(
&mut self,
sink_id: GlobalId,
from_id: GlobalId,
value_desc: RelationDesc,
target: PersistSink,
) -> &mut Self
pub fn export_materialized_view( &mut self, sink_id: GlobalId, from_id: GlobalId, value_desc: RelationDesc, target: PersistSink, ) -> &mut Self
Export a materialized-view persist sink sink_id writing the collection
from_id to a target persist shard (a materialized view).
value_desc is the output relation schema; it must match from_id’s type
(validated by the caller). The target shard is identified by target, whose
CollectionMetadata the augment step splices into the sink connection — the
compute persist sink opens it as SourceData/()/Timestamp/StorageDiff, the
same codec a storage collection uses, so the shard reads back like any other.
up_to is always the empty antichain: the persist sink does not implement
UP TO (it panics during rendering otherwise), and the real optimizer
likewise leaves a materialized view’s up_to empty — it is a subscribe-only
concept.
Sourcepub fn export_subscribe(
&mut self,
sink_id: GlobalId,
from_id: GlobalId,
value_desc: RelationDesc,
up_to: Antichain<Timestamp>,
) -> &mut Self
pub fn export_subscribe( &mut self, sink_id: GlobalId, from_id: GlobalId, value_desc: RelationDesc, up_to: Antichain<Timestamp>, ) -> &mut Self
Export a subscribe sink sink_id streaming changes of the collection
from_id back as ComputeResponse::SubscribeResponse batches.
Unlike a materialized view, a subscribe writes no shard, so it needs no
storage metadata. value_desc is the output schema (must match from_id’s
type); up_to is the exclusive upper at which the subscribe completes. The
empty output ordering leaves intra-timestamp order unconstrained — the
driver consolidates and sorts the updates for a deterministic golden.
Sourcepub fn as_of(&mut self, t: Timestamp) -> &mut Self
pub fn as_of(&mut self, t: Timestamp) -> &mut Self
Set the dataflow’s as_of (the read frontier hydration starts from).
Sourcepub fn until(&mut self, t: Timestamp) -> &mut Self
pub fn until(&mut self, t: Timestamp) -> &mut Self
Set the dataflow’s until (the exclusive upper bound past which output is
dropped). Defaults to the empty antichain (no bound).
Sourcepub fn optimize(&mut self) -> &mut Self
pub fn optimize(&mut self) -> &mut Self
Run the MIR dataflow optimizer in Self::finish before lowering.
Off by default: the builder otherwise lowers the caller’s MIR faithfully (the
contract above). Enable it for plans that don’t lower from raw MIR — notably a
Join, whose implementation defaults to Unimplemented and is rejected by
the LIR lowering until mz_transform::optimize_dataflow’s JoinImplementation
fills it in — or to reproduce the plan environmentd would ship for a logical
expression rather than the literal one written.
Sourcepub fn finish(
self,
) -> Result<DataflowDescription<RenderPlan, CollectionMetadata>>
pub fn finish( self, ) -> Result<DataflowDescription<RenderPlan, CollectionMetadata>>
Lower the accumulated MIR and attach persist wiring, producing the
DataflowDescription the compute protocol expects.
Returns an error rather than panicking on a malformed plan (e.g. a key column out of range, or an unbalanced object graph), so a caller driving this from external input — notably the script reader — can surface a clean error instead of crashing the process.
Auto Trait Implementations§
impl Freeze for DataflowBuilder
impl RefUnwindSafe for DataflowBuilder
impl Send for DataflowBuilder
impl Sync for DataflowBuilder
impl Unpin for DataflowBuilder
impl UnsafeUnpin for DataflowBuilder
impl UnwindSafe for DataflowBuilder
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Downcast for T
impl<T> Downcast for T
Source§impl<T> FmtForward for T
impl<T> FmtForward for T
Source§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self to use its Binary implementation when Debug-formatted.Source§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self to use its Display implementation when
Debug-formatted.Source§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self to use its LowerExp implementation when
Debug-formatted.Source§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self to use its LowerHex implementation when
Debug-formatted.Source§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self to use its Octal implementation when Debug-formatted.Source§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self to use its Pointer implementation when
Debug-formatted.Source§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self to use its UpperExp implementation when
Debug-formatted.Source§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self to use its UpperHex implementation when
Debug-formatted.Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::RequestSource§impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
impl<T, U> OverrideFrom<Option<&T>> for Uwhere
U: OverrideFrom<T>,
Source§impl<T> Paint for Twhere
T: ?Sized,
impl<T> Paint for Twhere
T: ?Sized,
Source§fn fg(&self, value: Color) -> Painted<&T>
fn fg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self with the foreground set to
value.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like red() and
green(), which have the same functionality but are
pithier.
§Example
Set foreground color to white using fg():
use yansi::{Paint, Color};
painted.fg(Color::White);Set foreground color to white using white().
use yansi::Paint;
painted.white();Source§fn bright_black(&self) -> Painted<&T>
fn bright_black(&self) -> Painted<&T>
Source§fn bright_red(&self) -> Painted<&T>
fn bright_red(&self) -> Painted<&T>
Source§fn bright_green(&self) -> Painted<&T>
fn bright_green(&self) -> Painted<&T>
Source§fn bright_yellow(&self) -> Painted<&T>
fn bright_yellow(&self) -> Painted<&T>
Source§fn bright_blue(&self) -> Painted<&T>
fn bright_blue(&self) -> Painted<&T>
Source§fn bright_magenta(&self) -> Painted<&T>
fn bright_magenta(&self) -> Painted<&T>
Source§fn bright_cyan(&self) -> Painted<&T>
fn bright_cyan(&self) -> Painted<&T>
Source§fn bright_white(&self) -> Painted<&T>
fn bright_white(&self) -> Painted<&T>
Source§fn bg(&self, value: Color) -> Painted<&T>
fn bg(&self, value: Color) -> Painted<&T>
Returns a styled value derived from self with the background set to
value.
This method should be used rarely. Instead, prefer to use color-specific
builder methods like on_red() and
on_green(), which have the same functionality but
are pithier.
§Example
Set background color to red using fg():
use yansi::{Paint, Color};
painted.bg(Color::Red);Set background color to red using on_red().
use yansi::Paint;
painted.on_red();Source§fn on_primary(&self) -> Painted<&T>
fn on_primary(&self) -> Painted<&T>
Source§fn on_magenta(&self) -> Painted<&T>
fn on_magenta(&self) -> Painted<&T>
Source§fn on_bright_black(&self) -> Painted<&T>
fn on_bright_black(&self) -> Painted<&T>
Source§fn on_bright_red(&self) -> Painted<&T>
fn on_bright_red(&self) -> Painted<&T>
Source§fn on_bright_green(&self) -> Painted<&T>
fn on_bright_green(&self) -> Painted<&T>
Source§fn on_bright_yellow(&self) -> Painted<&T>
fn on_bright_yellow(&self) -> Painted<&T>
Source§fn on_bright_blue(&self) -> Painted<&T>
fn on_bright_blue(&self) -> Painted<&T>
Source§fn on_bright_magenta(&self) -> Painted<&T>
fn on_bright_magenta(&self) -> Painted<&T>
Source§fn on_bright_cyan(&self) -> Painted<&T>
fn on_bright_cyan(&self) -> Painted<&T>
Source§fn on_bright_white(&self) -> Painted<&T>
fn on_bright_white(&self) -> Painted<&T>
Source§fn attr(&self, value: Attribute) -> Painted<&T>
fn attr(&self, value: Attribute) -> Painted<&T>
Enables the styling Attribute value.
This method should be used rarely. Instead, prefer to use
attribute-specific builder methods like bold() and
underline(), which have the same functionality
but are pithier.
§Example
Make text bold using attr():
use yansi::{Paint, Attribute};
painted.attr(Attribute::Bold);Make text bold using using bold().
use yansi::Paint;
painted.bold();Source§fn rapid_blink(&self) -> Painted<&T>
fn rapid_blink(&self) -> Painted<&T>
Source§fn quirk(&self, value: Quirk) -> Painted<&T>
fn quirk(&self, value: Quirk) -> Painted<&T>
Enables the yansi Quirk value.
This method should be used rarely. Instead, prefer to use quirk-specific
builder methods like mask() and
wrap(), which have the same functionality but are
pithier.
§Example
Enable wrapping using .quirk():
use yansi::{Paint, Quirk};
painted.quirk(Quirk::Wrap);Enable wrapping using wrap().
use yansi::Paint;
painted.wrap();Source§fn clear(&self) -> Painted<&T>
👎Deprecated since 1.0.1: renamed to resetting() due to conflicts with Vec::clear().
The clear() method will be removed in a future release.
fn clear(&self) -> Painted<&T>
renamed to resetting() due to conflicts with Vec::clear().
The clear() method will be removed in a future release.
Source§fn whenever(&self, value: Condition) -> Painted<&T>
fn whenever(&self, value: Condition) -> Painted<&T>
Conditionally enable styling based on whether the Condition value
applies. Replaces any previous condition.
See the crate level docs for more details.
§Example
Enable styling painted only when both stdout and stderr are TTYs:
use yansi::{Paint, Condition};
painted.red().on_yellow().whenever(Condition::STDOUTERR_ARE_TTY);Source§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
Source§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
Source§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self and passes that borrow into the pipe function. Read moreSource§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
Source§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
Source§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self, then passes self.as_ref() into the pipe function.Source§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self, then passes self.as_mut() into the pipe
function.Source§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self, then passes self.deref() into the pipe function.Source§impl<T> Pointable for T
impl<T> Pointable for T
Source§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
Source§impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
impl<P, R> ProtoType<R> for Pwhere
R: RustType<P>,
Source§fn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
RustType::from_proto.Source§fn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
RustType::into_proto.Source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
Source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign, for types that do not implement AddAssign.Source§impl<T> ServiceExt for T
impl<T> ServiceExt for T
Source§fn map_response_body<F>(self, f: F) -> MapResponseBody<Self, F>where
Self: Sized,
fn map_response_body<F>(self, f: F) -> MapResponseBody<Self, F>where
Self: Sized,
Source§fn decompression(self) -> Decompression<Self>where
Self: Sized,
fn decompression(self) -> Decompression<Self>where
Self: Sized,
Source§fn trace_for_http(self) -> Trace<Self, SharedClassifier<ServerErrorsAsFailures>>where
Self: Sized,
fn trace_for_http(self) -> Trace<Self, SharedClassifier<ServerErrorsAsFailures>>where
Self: Sized,
Source§fn trace_for_grpc(self) -> Trace<Self, SharedClassifier<GrpcErrorsAsFailures>>where
Self: Sized,
fn trace_for_grpc(self) -> Trace<Self, SharedClassifier<GrpcErrorsAsFailures>>where
Self: Sized,
Source§fn follow_redirects(self) -> FollowRedirect<Self>where
Self: Sized,
fn follow_redirects(self) -> FollowRedirect<Self>where
Self: Sized,
Source§impl<T> Tap for T
impl<T> Tap for T
Source§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B> of a value. Read moreSource§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B> of a value. Read moreSource§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R> view of a value. Read moreSource§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R> view of a value. Read moreSource§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target of a value. Read moreSource§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap() only in debug builds, and is erased in release builds.Source§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow() only in debug builds, and is erased in release
builds.Source§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut() only in debug builds, and is erased in release
builds.Source§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref() only in debug builds, and is erased in release
builds.Source§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut() only in debug builds, and is erased in release
builds.Source§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref() only in debug builds, and is erased in release
builds.