Skip to main content

mz_clusterd_test_driver/
dataflow.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Assembly of compute [`DataflowDescription`]s for the headless test driver.
11//!
12//! [`DataflowBuilder`] is the generic boundary between tests and the dataflow
13//! assembly mechanism. A test describes its dataflow in terms of persist imports,
14//! MIR objects to compute, and index exports; the builder owns the parts that are
15//! hard and reusable — the MIR-to-LIR lowering, the [`RenderPlan`] conversion, the
16//! [`CollectionMetadata`] attachment, and the `SqlRelationType`-versus-
17//! `ReprRelationType` bookkeeping — and produces a
18//! `DataflowDescription<RenderPlan, CollectionMetadata>` ready to ship as
19//! [`ComputeCommand::CreateDataflow`].
20//!
21//! [`index_dataflow`] is thin sugar over the builder for the common single-index
22//! shape.
23//!
24//! [`ComputeCommand::CreateDataflow`]: mz_compute_client::protocol::command::ComputeCommand::CreateDataflow
25
26use std::collections::BTreeMap;
27
28use mz_compute_types::dataflows::{
29    BuildDesc, DataflowDescription, IndexDesc, IndexImport, SourceImport,
30};
31use mz_compute_types::plan::Plan;
32use mz_compute_types::plan::render_plan::RenderPlan;
33use mz_compute_types::sinks::{
34    ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection, SubscribeSinkConnection,
35};
36use mz_compute_types::sources::SourceInstanceDesc;
37use mz_expr::{
38    AggregateExpr, AggregateFunc, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr,
39};
40use mz_persist_types::{PersistLocation, ShardId};
41use mz_repr::optimize::OptimizerFeatures;
42use mz_repr::{GlobalId, RelationDesc, ReprRelationType, Timestamp};
43use mz_storage_types::controller::CollectionMetadata;
44use mz_transform::dataflow::DataflowMetainfo;
45use mz_transform::typecheck::empty_typechecking_context;
46use mz_transform::{EmptyStatisticsOracle, IndexOracle, TransformCtx, optimize_dataflow};
47use timely::progress::Antichain;
48
49/// A persist-backed storage collection to import into a dataflow.
50///
51/// `upper` is the exclusive upper bound of the shard's written data (the next
52/// timestamp after the last written one): for data written at a single timestamp
53/// `t`, pass `t + 1`; for data spread across `0..n_ts`, pass `n_ts`. The compute
54/// instance uses it to know when the source's data is fully available.
55#[derive(Clone, Debug)]
56pub struct PersistSource {
57    /// The data shard backing the collection.
58    pub shard: ShardId,
59    /// The persist location (blob + consensus) the shard lives in.
60    pub location: PersistLocation,
61    /// The relation schema of the collection.
62    pub desc: RelationDesc,
63    /// The exclusive upper bound of the shard's written data.
64    pub upper: Timestamp,
65}
66
67/// A persist-backed target shard for a materialized-view sink to write to.
68#[derive(Clone, Debug)]
69pub struct PersistSink {
70    /// The data shard the sink writes its output to.
71    pub shard: ShardId,
72    /// The persist location (blob + consensus) the shard lives in.
73    pub location: PersistLocation,
74}
75
76/// An [`IndexOracle`] over a dataflow's own `index_imports`, exposing exactly the
77/// arrangements this dataflow may read.
78///
79/// The real `environmentd` optimizer is handed a catalog-backed oracle that knows
80/// every index on the cluster; the test driver has no catalog, but a dataflow's
81/// `index_imports` already name exactly the arrangements available to it, so they
82/// are the correct — and only — index information to expose. Without this, the
83/// optimizer would not recognize an imported index and would re-plan a `Get` over
84/// the indexed collection as a (non-existent) persist read.
85#[derive(Debug)]
86struct ImportedIndexOracle {
87    /// `on_id` -> the `(index_id, key)` arrangements imported on it.
88    by_on_id: BTreeMap<GlobalId, Vec<(GlobalId, Vec<MirScalarExpr>)>>,
89}
90
91impl ImportedIndexOracle {
92    /// Build the oracle from a dataflow's `index_imports`, grouping by arranged id.
93    fn new(index_imports: &BTreeMap<GlobalId, IndexImport>) -> Self {
94        let mut by_on_id: BTreeMap<GlobalId, Vec<(GlobalId, Vec<MirScalarExpr>)>> = BTreeMap::new();
95        for (index_id, import) in index_imports {
96            by_on_id
97                .entry(import.desc.on_id)
98                .or_default()
99                .push((*index_id, import.desc.key.clone()));
100        }
101        ImportedIndexOracle { by_on_id }
102    }
103}
104
105impl IndexOracle for ImportedIndexOracle {
106    fn indexes_on(
107        &self,
108        id: GlobalId,
109    ) -> Box<dyn Iterator<Item = (GlobalId, &[MirScalarExpr])> + '_> {
110        match self.by_on_id.get(&id) {
111            Some(indexes) => Box::new(indexes.iter().map(|(id, key)| (*id, key.as_slice()))),
112            None => Box::new(std::iter::empty()),
113        }
114    }
115}
116
117/// A handle to an imported collection or built object, used to reference it when
118/// constructing MIR for further objects.
119#[derive(Clone, Debug)]
120pub struct Input {
121    id: GlobalId,
122    typ: ReprRelationType,
123}
124
125impl Input {
126    /// The id this input is bound to in the dataflow.
127    pub fn id(&self) -> GlobalId {
128        self.id
129    }
130
131    /// A MIR `Get` of this input, carrying its relation type, for use as a leaf
132    /// when building a computation over it.
133    pub fn get(&self) -> MirRelationExpr {
134        MirRelationExpr::global_get(self.id, self.typ.clone())
135    }
136}
137
138/// Builds a compute dataflow from generic parts, hiding the lowering and persist
139/// wiring mechanism.
140///
141/// # Contract
142///
143/// By default the caller supplies MIR and the builder lowers it *faithfully*,
144/// attaching the persist wiring without optimizing — so a hand-built minimal plan
145/// lowers exactly as written. Optimization — fusion, predicate pushdown, and notably
146/// join-implementation selection — is opt-in via [`Self::optimize`], paid for only
147/// by callers that need it. A `Join` whose `implementation` is left `Unimplemented`
148/// is rejected by the LIR lowering, so a plan containing one requires `optimize`,
149/// which runs [`mz_transform::optimize_dataflow`] to fill the implementation first.
150/// When optimizing, the builder hands the optimizer an index oracle built from its
151/// own `index_imports` (`ImportedIndexOracle`), so imported arrangements are
152/// recognized — the same index information `environmentd`'s catalog oracle would
153/// supply for these imports.
154///
155/// # Construction strategy
156///
157/// The builder deliberately does *not* hand-roll the [`RenderPlan`]: the [`LirId`]s
158/// used to stitch nodes together have no public constructor, and the [`LetFreePlan`]
159/// invariants (notably a valid `topological_order`) are easy to get wrong. Instead
160/// it mirrors exactly what the real compute controller does:
161///
162///  1. Accumulate a MIR-level [`DataflowDescription<OptimizedMirRelationExpr, ()>`]
163///     using the same [`import_source`] / [`insert_plan`] / [`export_index`] helpers
164///     the optimizer uses.
165///  2. Lower it to LIR via [`Plan::finalize_dataflow`], yielding
166///     [`DataflowDescription<Plan, ()>`].
167///  3. Augment it into [`DataflowDescription<RenderPlan, CollectionMetadata>`] by
168///     converting each object's [`Plan`] via [`RenderPlan::try_from`] and attaching
169///     the storage [`CollectionMetadata`] to each source import — the same step
170///     performed in `compute-client`'s `Instance::create_dataflow`.
171///
172/// This guarantees the emitted plan is structurally identical to one produced by a
173/// live `environmentd`, at the cost of running the (cheap, deterministic) lowering
174/// in-process.
175///
176/// [`LirId`]: mz_compute_types::plan::LirId
177/// [`LetFreePlan`]: mz_compute_types::plan::render_plan::LetFreePlan
178/// [`import_source`]: DataflowDescription::import_source
179/// [`insert_plan`]: DataflowDescription::insert_plan
180/// [`export_index`]: DataflowDescription::export_index
181/// [`DataflowDescription<OptimizedMirRelationExpr, ()>`]: DataflowDescription
182/// [`DataflowDescription<Plan, ()>`]: DataflowDescription
183/// [`DataflowDescription<RenderPlan, CollectionMetadata>`]: DataflowDescription
184pub struct DataflowBuilder {
185    /// The MIR-level description being accumulated.
186    mir: DataflowDescription<OptimizedMirRelationExpr, ()>,
187    /// Persist metadata per imported source id, consumed by the augment step.
188    sources: BTreeMap<GlobalId, PersistSource>,
189    /// Target storage metadata per materialized-view sink id, consumed by the
190    /// augment step to fill the sink connection's `storage_metadata`.
191    sinks: BTreeMap<GlobalId, CollectionMetadata>,
192    /// Relation type per referenceable id (imports and built objects), so
193    /// `export_index` can derive the `on_type` instead of taking it as an argument.
194    types: BTreeMap<GlobalId, ReprRelationType>,
195    /// Whether `finish` runs the MIR dataflow optimizer before lowering. Off by
196    /// default (faithful lowering of the caller's MIR); see [`Self::optimize`].
197    optimize: bool,
198}
199
200impl DataflowBuilder {
201    /// Start an empty builder. `name` becomes the dataflow's debug name.
202    pub fn new(name: impl Into<String>) -> Self {
203        DataflowBuilder {
204            mir: DataflowDescription::new(name.into()),
205            sources: BTreeMap::new(),
206            sinks: BTreeMap::new(),
207            types: BTreeMap::new(),
208            optimize: false,
209        }
210    }
211
212    /// Import a persist-backed storage collection as `id`.
213    ///
214    /// Registers the source on the MIR description and records the persist metadata
215    /// for the augment step. Returns an [`Input`] handle whose [`Input::get`] yields
216    /// a correctly typed `Get` node, so callers never construct a [`ReprRelationType`]
217    /// by hand.
218    pub fn import_persist(&mut self, id: GlobalId, source: PersistSource) -> Input {
219        // `import_source` takes the `SqlRelationType`; the `Get`/export path wants the
220        // `ReprRelationType`. Both are derived from the single `desc`.
221        let sql_typ = source.desc.typ().clone();
222        let repr_typ = ReprRelationType::from(source.desc.typ());
223        // `monotonic: false` matches the verified-structure requirement.
224        self.mir.import_source(id, sql_typ, false);
225        self.sources.insert(id, source);
226        self.types.insert(id, repr_typ.clone());
227        Input { id, typ: repr_typ }
228    }
229
230    /// Import a previously-exported index, making the collection it arranges
231    /// (`on_id`) available to this dataflow as an in-memory arrangement.
232    ///
233    /// Unlike [`Self::import_persist`], this imports no storage collection: the
234    /// arrangement is served from the replica's existing, hydrated index, so the
235    /// dataflow needs no [`CollectionMetadata`] and the augment step leaves the
236    /// index import untouched. The MIR-to-LIR lowering registers the imported
237    /// arrangement under `Get(on_id)` automatically, so a faithful (unoptimized)
238    /// `Get(on_id)` picks it up. Returns an [`Input`] referencing `on_id` — the
239    /// id a computation `Get`s, not the index id itself.
240    pub fn import_index(
241        &mut self,
242        index_id: GlobalId,
243        on_id: GlobalId,
244        key_cols: Vec<usize>,
245        on_type: ReprRelationType,
246        monotonic: bool,
247    ) -> Input {
248        let key: Vec<MirScalarExpr> = key_cols.into_iter().map(MirScalarExpr::column).collect();
249        self.mir.import_index(
250            index_id,
251            IndexDesc { on_id, key },
252            on_type.clone(),
253            monotonic,
254        );
255        self.types.insert(on_id, on_type.clone());
256        Input {
257            id: on_id,
258            typ: on_type,
259        }
260    }
261
262    /// A typed `Get` of an already-imported or built id, for callers that
263    /// assemble MIR by id rather than threading [`Input`] handles — notably the
264    /// JSON MIR translator. Errors if `id` was never imported or built, so a bad
265    /// reference surfaces cleanly instead of constructing an ill-typed `Get`.
266    pub fn get(&self, id: GlobalId) -> anyhow::Result<MirRelationExpr> {
267        let typ = self
268            .types
269            .get(&id)
270            .ok_or_else(|| anyhow::anyhow!("get of unknown id {id}; import or build it first"))?
271            .clone();
272        Ok(MirRelationExpr::global_get(id, typ))
273    }
274
275    /// Insert a MIR object to compute, bound to `id`.
276    ///
277    /// `expr` is wrapped via [`OptimizedMirRelationExpr::declare_optimized`]; the
278    /// caller is responsible for any optimization (see the type-level contract). The
279    /// object's relation type is recorded so a later [`Self::export_index`] over `id`
280    /// can derive its `on_type`.
281    pub fn build(&mut self, id: GlobalId, expr: MirRelationExpr) -> &mut Self {
282        self.types.insert(id, expr.typ());
283        self.mir
284            .insert_plan(id, OptimizedMirRelationExpr::declare_optimized(expr));
285        self
286    }
287
288    /// Export an index `index_id` arranging `on_id` by `key_cols`.
289    ///
290    /// `on_id` may be an imported source or a built object; either way the lowering
291    /// synthesizes the `ArrangeBy`. The `on_type` is derived from the referenced id,
292    /// which must have been imported or built first.
293    pub fn export_index(
294        &mut self,
295        index_id: GlobalId,
296        on_id: GlobalId,
297        key_cols: Vec<usize>,
298    ) -> &mut Self {
299        let on_type = self
300            .types
301            .get(&on_id)
302            .unwrap_or_else(|| panic!("export_index on unknown id {on_id}"))
303            .clone();
304        let key: Vec<MirScalarExpr> = key_cols.into_iter().map(MirScalarExpr::column).collect();
305        self.mir
306            .export_index(index_id, IndexDesc { on_id, key }, on_type);
307        self
308    }
309
310    /// Export a materialized-view persist sink `sink_id` writing the collection
311    /// `from_id` to a target persist shard (a materialized view).
312    ///
313    /// `value_desc` is the output relation schema; it must match `from_id`'s type
314    /// (validated by the caller). The target shard is identified by `target`, whose
315    /// `CollectionMetadata` the augment step splices into the sink connection — the
316    /// compute persist sink opens it as `SourceData/()/Timestamp/StorageDiff`, the
317    /// same codec a storage collection uses, so the shard reads back like any other.
318    ///
319    /// `up_to` is always the empty antichain: the persist sink does not implement
320    /// `UP TO` (it panics during rendering otherwise), and the real optimizer
321    /// likewise leaves a materialized view's `up_to` empty — it is a subscribe-only
322    /// concept.
323    pub fn export_materialized_view(
324        &mut self,
325        sink_id: GlobalId,
326        from_id: GlobalId,
327        value_desc: RelationDesc,
328        target: PersistSink,
329    ) -> &mut Self {
330        let metadata = CollectionMetadata {
331            persist_location: target.location,
332            data_shard: target.shard,
333            relation_desc: value_desc.clone(),
334            txns_shard: None,
335        };
336        self.sinks.insert(sink_id, metadata);
337        // The MIR-level description carries the unit storage metadata; the augment
338        // step replaces it with the `CollectionMetadata` recorded above.
339        let desc = ComputeSinkDesc {
340            from: from_id,
341            from_desc: value_desc.clone(),
342            connection: ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
343                value_desc,
344                storage_metadata: (),
345            }),
346            with_snapshot: true,
347            up_to: Antichain::new(),
348            non_null_assertions: vec![],
349            refresh_schedule: None,
350        };
351        self.mir.export_sink(sink_id, desc);
352        self
353    }
354
355    /// Export a subscribe sink `sink_id` streaming changes of the collection
356    /// `from_id` back as `ComputeResponse::SubscribeResponse` batches.
357    ///
358    /// Unlike a materialized view, a subscribe writes no shard, so it needs no
359    /// storage metadata. `value_desc` is the output schema (must match `from_id`'s
360    /// type); `up_to` is the exclusive upper at which the subscribe completes. The
361    /// empty `output` ordering leaves intra-timestamp order unconstrained — the
362    /// driver consolidates and sorts the updates for a deterministic golden.
363    pub fn export_subscribe(
364        &mut self,
365        sink_id: GlobalId,
366        from_id: GlobalId,
367        value_desc: RelationDesc,
368        up_to: Antichain<Timestamp>,
369    ) -> &mut Self {
370        let desc = ComputeSinkDesc {
371            from: from_id,
372            from_desc: value_desc,
373            connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection {
374                output: vec![],
375            }),
376            with_snapshot: true,
377            up_to,
378            non_null_assertions: vec![],
379            refresh_schedule: None,
380        };
381        self.mir.export_sink(sink_id, desc);
382        self
383    }
384
385    /// Set the dataflow's `as_of` (the read frontier hydration starts from).
386    pub fn as_of(&mut self, t: Timestamp) -> &mut Self {
387        self.mir.as_of = Some(Antichain::from_elem(t));
388        self
389    }
390
391    /// Set the dataflow's `until` (the exclusive upper bound past which output is
392    /// dropped). Defaults to the empty antichain (no bound).
393    pub fn until(&mut self, t: Timestamp) -> &mut Self {
394        self.mir.until = Antichain::from_elem(t);
395        self
396    }
397
398    /// Run the MIR dataflow optimizer in [`Self::finish`] before lowering.
399    ///
400    /// Off by default: the builder otherwise lowers the caller's MIR faithfully (the
401    /// contract above). Enable it for plans that don't lower from raw MIR — notably a
402    /// `Join`, whose `implementation` defaults to `Unimplemented` and is rejected by
403    /// the LIR lowering until [`mz_transform::optimize_dataflow`]'s `JoinImplementation`
404    /// fills it in — or to reproduce the plan `environmentd` would ship for a logical
405    /// expression rather than the literal one written.
406    pub fn optimize(&mut self) -> &mut Self {
407        self.optimize = true;
408        self
409    }
410
411    /// Lower the accumulated MIR and attach persist wiring, producing the
412    /// `DataflowDescription` the compute protocol expects.
413    ///
414    /// Returns an error rather than panicking on a malformed plan (e.g. a key
415    /// column out of range, or an unbalanced object graph), so a caller driving
416    /// this from external input — notably the script reader — can surface a clean
417    /// error instead of crashing the process.
418    pub fn finish(mut self) -> anyhow::Result<DataflowDescription<RenderPlan, CollectionMetadata>> {
419        let features = OptimizerFeatures::default();
420        // Optionally run the MIR dataflow optimizer first (e.g. to fill a `Join`'s
421        // implementation). The index oracle is built from this dataflow's own
422        // `index_imports`, so the optimizer recognizes imported arrangements and
423        // plans `Get`s over them as arrangement reads (not persist reads); the
424        // statistics oracle is empty — no catalog stats — so join planning falls
425        // back to a differential join, which lowers.
426        if self.optimize {
427            let indexes = ImportedIndexOracle::new(&self.mir.index_imports);
428            let typecheck_ctx = empty_typechecking_context();
429            let mut df_meta = DataflowMetainfo::default();
430            let mut ctx = TransformCtx::global(
431                &indexes,
432                &EmptyStatisticsOracle,
433                &features,
434                &typecheck_ctx,
435                &mut df_meta,
436                None,
437            );
438            optimize_dataflow(&mut self.mir, &mut ctx, false)
439                .map_err(|e| anyhow::anyhow!("optimizing dataflow failed: {e}"))?;
440        }
441        // Lower MIR -> LIR. Deterministic and self-contained.
442        let lowered: DataflowDescription<Plan, ()> =
443            Plan::finalize_dataflow(self.mir, &features)
444                .map_err(|e| anyhow::anyhow!("lowering dataflow failed: {e}"))?;
445        augment(lowered, &self.sources, &self.sinks)
446    }
447}
448
449/// Build a single-index dataflow over a persist shard.
450///
451/// Thin sugar over [`DataflowBuilder`] for the common shape: import the collection
452/// backed by `shard` as `source_id`, set `as_of`, and export an index `index_id`
453/// arranging the collection by `key_cols`.
454///
455/// `shard_upper` is the exclusive upper bound of the shard's written data; see
456/// [`PersistSource::upper`].
457pub fn index_dataflow(
458    source_id: GlobalId,
459    index_id: GlobalId,
460    shard: ShardId,
461    location: PersistLocation,
462    desc: RelationDesc,
463    key_cols: Vec<usize>,
464    as_of: Timestamp,
465    shard_upper: Timestamp,
466) -> anyhow::Result<DataflowDescription<RenderPlan, CollectionMetadata>> {
467    let mut builder = DataflowBuilder::new("headless-index");
468    builder.import_persist(
469        source_id,
470        PersistSource {
471            shard,
472            location,
473            desc,
474            upper: shard_upper,
475        },
476    );
477    builder.as_of(as_of);
478    builder.export_index(index_id, source_id, key_cols);
479    builder.finish()
480}
481
482/// Build a dataflow that counts the rows of an existing index and exports the
483/// count as a new, peekable index.
484///
485/// Imports index `index_id` (arranging `on_id`, schema `on_type`, key `key_cols`),
486/// computes `Reduce` with a single `count(*)` aggregate and an empty group key over
487/// `Get(on_id)`, and exports `out_index_id` arranging the one-column count by `[0]`.
488/// This is the compute-side realization of a row-count assertion: the count runs
489/// through a real reduce operator rather than being tallied in the driver.
490///
491/// The result collection has one `bigint` column. Over an empty input the reduce
492/// emits no rows (SQL's default-zero is added higher up), so a peek of the output
493/// yields `[]`, which callers read as a count of `0`.
494pub fn count_over_index(
495    index_id: GlobalId,
496    on_id: GlobalId,
497    on_type: ReprRelationType,
498    key_cols: Vec<usize>,
499    reduce_id: GlobalId,
500    out_index_id: GlobalId,
501    as_of: Timestamp,
502) -> anyhow::Result<DataflowDescription<RenderPlan, CollectionMetadata>> {
503    let mut builder = DataflowBuilder::new("headless-count");
504    // `monotonic: false` keeps the import faithful to a general (non-append-only)
505    // index; the count reduce does not require monotonicity.
506    let input = builder.import_index(index_id, on_id, key_cols, on_type, false);
507    // `count(*)`: count over a non-null literal, so every row contributes.
508    let count = AggregateExpr {
509        func: AggregateFunc::Count,
510        expr: MirScalarExpr::literal_true(),
511        distinct: false,
512    };
513    let reduce = MirRelationExpr::Reduce {
514        input: Box::new(input.get()),
515        group_key: vec![],
516        aggregates: vec![count],
517        monotonic: false,
518        expected_group_size: None,
519    };
520    builder.build(reduce_id, reduce);
521    builder.as_of(as_of);
522    // The reduce output is a single column; arrange it by that column so the
523    // exported index is peekable.
524    builder.export_index(out_index_id, reduce_id, vec![0]);
525    builder.finish()
526}
527
528/// Convert a lowered `DataflowDescription<Plan, ()>` into the
529/// `<RenderPlan, CollectionMetadata>` form expected by the compute protocol.
530///
531/// Mirrors `compute-client`'s `Instance::create_dataflow`: each object's [`Plan`]
532/// is flattened into a [`RenderPlan`], and every source import is augmented with the
533/// storage [`CollectionMetadata`] needed by the compute instance to read it. The
534/// per-id [`PersistSource`] supplies the metadata and the exclusive `upper` telling
535/// the compute instance up to which timestamp the shard's data is available.
536fn augment(
537    lowered: DataflowDescription<Plan, ()>,
538    sources: &BTreeMap<GlobalId, PersistSource>,
539    sinks: &BTreeMap<GlobalId, CollectionMetadata>,
540) -> anyhow::Result<DataflowDescription<RenderPlan, CollectionMetadata>> {
541    // Attach the storage metadata to each source import, looked up by id. In a live
542    // controller the `upper` is the storage collection's real write frontier; the
543    // caller provides it via `PersistSource::upper` to reflect the written data.
544    let mut source_imports = BTreeMap::new();
545    for (id, import) in lowered.source_imports {
546        let source = sources
547            .get(&id)
548            .ok_or_else(|| anyhow::anyhow!("no persist metadata registered for source {id}"))?;
549        let metadata = CollectionMetadata {
550            persist_location: source.location.clone(),
551            data_shard: source.shard,
552            relation_desc: source.desc.clone(),
553            txns_shard: None,
554        };
555        let desc = SourceInstanceDesc {
556            storage_metadata: metadata,
557            arguments: import.desc.arguments,
558            typ: import.desc.typ,
559        };
560        source_imports.insert(
561            id,
562            SourceImport {
563                desc,
564                monotonic: import.monotonic,
565                with_snapshot: import.with_snapshot,
566                upper: Antichain::from_elem(source.upper),
567            },
568        );
569    }
570
571    let objects_to_build = lowered
572        .objects_to_build
573        .into_iter()
574        .map(|object| {
575            // `RenderPlan::try_from` fails (with `()`) on a structurally invalid
576            // lowered plan; surface it as an error rather than panicking.
577            let plan = RenderPlan::try_from(object.plan)
578                .map_err(|()| anyhow::anyhow!("RenderPlan conversion failed for {}", object.id))?;
579            Ok::<_, anyhow::Error>(BuildDesc {
580                id: object.id,
581                plan,
582            })
583        })
584        .collect::<anyhow::Result<Vec<_>>>()?;
585
586    // Splice the storage metadata into each sink export, mirroring how
587    // `compute-client`'s `Instance::create_dataflow` fills the materialized-view
588    // sink's `storage_metadata` from the storage controller. A subscribe carries no
589    // metadata; copy-to is not built by this driver.
590    let mut sink_exports = BTreeMap::new();
591    for (id, sink) in lowered.sink_exports {
592        let connection = match sink.connection {
593            ComputeSinkConnection::MaterializedView(conn) => {
594                let metadata = sinks.get(&id).ok_or_else(|| {
595                    anyhow::anyhow!("no target metadata registered for materialized-view sink {id}")
596                })?;
597                ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
598                    value_desc: conn.value_desc,
599                    storage_metadata: metadata.clone(),
600                })
601            }
602            ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
603            ComputeSinkConnection::CopyToS3Oneshot(_) => {
604                anyhow::bail!("copy-to-s3 sink {id} is not implemented")
605            }
606        };
607        sink_exports.insert(
608            id,
609            ComputeSinkDesc {
610                from: sink.from,
611                from_desc: sink.from_desc,
612                connection,
613                with_snapshot: sink.with_snapshot,
614                up_to: sink.up_to,
615                non_null_assertions: sink.non_null_assertions,
616                refresh_schedule: sink.refresh_schedule,
617            },
618        );
619    }
620
621    Ok(DataflowDescription {
622        source_imports,
623        objects_to_build,
624        // The remaining fields carry over unchanged from the lowered dataflow.
625        index_imports: lowered.index_imports,
626        index_exports: lowered.index_exports,
627        sink_exports,
628        as_of: lowered.as_of,
629        until: lowered.until,
630        initial_storage_as_of: lowered.initial_storage_as_of,
631        refresh_schedule: lowered.refresh_schedule,
632        debug_name: lowered.debug_name,
633        time_dependence: lowered.time_dependence,
634    })
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640
641    use mz_compute_types::plan::GetPlan;
642    use mz_compute_types::plan::render_plan::Expr;
643    use mz_expr::Id;
644
645    /// Assert the assembled dataflow matches the verified structure: a single
646    /// source import, a single object building `Get(source) -> ArrangeBy(key)`,
647    /// and a single index export over the source.
648    #[mz_ore::test]
649    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
650    fn index_dataflow_structure() {
651        let desc = crate::data::sample_desc();
652        let loc = PersistLocation {
653            blob_uri: "mem://".parse().unwrap(),
654            consensus_uri: "mem://".parse().unwrap(),
655        };
656        let df = index_dataflow(
657            GlobalId::User(1000),
658            GlobalId::User(1001),
659            ShardId::new(),
660            loc,
661            desc,
662            vec![0],
663            Timestamp::from(0),
664            Timestamp::from(1),
665        )
666        .unwrap();
667        // Structural assertions mirroring the spec.
668        assert_eq!(df.source_imports.len(), 1);
669        assert_eq!(df.objects_to_build.len(), 1);
670        assert_eq!(df.index_exports.len(), 1);
671        assert!(df.sink_exports.is_empty());
672        assert!(df.index_imports.is_empty());
673        assert_eq!(df.as_of, Some(Antichain::from_elem(Timestamp::from(0))));
674        assert_eq!(df.debug_name, "headless-index");
675
676        let (sid, si) = df.source_imports.iter().next().unwrap();
677        assert_eq!(*sid, GlobalId::User(1000));
678        assert!(si.with_snapshot);
679        assert!(!si.monotonic);
680        assert_eq!(si.upper, Antichain::from_elem(Timestamp::from(1)));
681        assert!(si.desc.arguments.operators.is_none());
682
683        let (iid, (idesc, _typ)) = df.index_exports.iter().next().unwrap();
684        assert_eq!(*iid, GlobalId::User(1001));
685        assert_eq!(idesc.on_id, GlobalId::User(1000));
686        assert_eq!(idesc.key, vec![MirScalarExpr::column(0)]);
687
688        // The built object is `Get(source) -> ArrangeBy(key)`. Destructure the
689        // `RenderPlan` and verify the root arranges, keyed by `Column(0)`, over a
690        // `Get` of the source collection.
691        let plan = &df.objects_to_build[0].plan;
692        assert!(plan.binds.is_empty());
693        let (nodes, root, _order) = plan.body.clone().destruct();
694        let root_node = &nodes[&root];
695        let Expr::ArrangeBy {
696            input,
697            forms,
698            strategy,
699            ..
700        } = &root_node.expr
701        else {
702            panic!("expected root ArrangeBy, got {:?}", root_node.expr);
703        };
704        assert_eq!(forms.arranged.len(), 1);
705        assert_eq!(forms.arranged[0].0, vec![MirScalarExpr::column(0)]);
706        assert_eq!(
707            *strategy,
708            mz_compute_types::plan::ArrangementStrategy::Direct
709        );
710        let input_node = &nodes[input];
711        let Expr::Get { id, plan, .. } = &input_node.expr else {
712            panic!("expected ArrangeBy input Get, got {:?}", input_node.expr);
713        };
714        assert_eq!(*id, Id::Global(GlobalId::User(1000)));
715        assert!(matches!(plan, GetPlan::PassArrangements));
716    }
717
718    /// Exercise the general `build` path: import a source, compute a `Project` over
719    /// it, and export an index on the computed object. The computation and the
720    /// arrange must lower to two distinct objects, and the index export must
721    /// reference the built object rather than the source.
722    #[mz_ore::test]
723    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
724    fn build_computed_object_lowers() {
725        let desc = crate::data::sample_desc();
726        let loc = PersistLocation {
727            blob_uri: "mem://".parse().unwrap(),
728            consensus_uri: "mem://".parse().unwrap(),
729        };
730        let (source_id, comp_id, index_id) = (
731            GlobalId::User(1000),
732            GlobalId::User(1001),
733            GlobalId::User(1002),
734        );
735
736        let mut builder = DataflowBuilder::new("headless-build");
737        let src = builder.import_persist(
738            source_id,
739            PersistSource {
740                shard: ShardId::new(),
741                location: loc,
742                desc,
743                upper: Timestamp::from(1),
744            },
745        );
746        // Project away the payload column, keeping only `id` (column 0).
747        builder.build(comp_id, src.get().project(vec![0]));
748        builder.as_of(Timestamp::from(0));
749        builder.export_index(index_id, comp_id, vec![0]);
750        let df = builder.finish().unwrap();
751
752        // One source import; the index export references the computed object.
753        assert_eq!(df.source_imports.len(), 1);
754        assert!(df.source_imports.contains_key(&source_id));
755        let (iid, (idesc, _typ)) = df.index_exports.iter().next().unwrap();
756        assert_eq!(*iid, index_id);
757        assert_eq!(idesc.on_id, comp_id);
758
759        // The computation and the arrange lower to two distinct build objects.
760        assert_eq!(df.objects_to_build.len(), 2);
761        let ids: Vec<_> = df.objects_to_build.iter().map(|o| o.id).collect();
762        assert!(ids.contains(&comp_id));
763        assert!(ids.contains(&index_id));
764    }
765
766    /// A `Join` does not lower from raw MIR — its `implementation` defaults to
767    /// `Unimplemented` and the LIR lowering rejects it — but `optimize()` runs the
768    /// MIR optimizer first, which fills the implementation, so the same dataflow
769    /// then lowers. This is exactly what the `optimize` flag buys.
770    #[mz_ore::test]
771    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
772    fn join_lowers_only_with_optimize() {
773        let loc = PersistLocation {
774            blob_uri: "mem://".parse().unwrap(),
775            consensus_uri: "mem://".parse().unwrap(),
776        };
777        // Build a two-source equi-join (`#0 = #2` across the concatenated columns)
778        // and export an index over it. `optimize` selects whether the MIR optimizer
779        // runs in `finish`.
780        let assemble = |optimize: bool| {
781            let mut builder = DataflowBuilder::new("headless-join-test");
782            let left = builder.import_persist(
783                GlobalId::User(1000),
784                PersistSource {
785                    shard: ShardId::new(),
786                    location: loc.clone(),
787                    desc: crate::data::sample_desc(),
788                    upper: Timestamp::from(1),
789                },
790            );
791            let right = builder.import_persist(
792                GlobalId::User(1001),
793                PersistSource {
794                    shard: ShardId::new(),
795                    location: loc.clone(),
796                    desc: crate::data::sample_desc(),
797                    upper: Timestamp::from(1),
798                },
799            );
800            let join = MirRelationExpr::join_scalars(
801                vec![left.get(), right.get()],
802                vec![vec![MirScalarExpr::column(0), MirScalarExpr::column(2)]],
803            );
804            builder.build(GlobalId::User(2000), join);
805            if optimize {
806                builder.optimize();
807            }
808            builder.as_of(Timestamp::from(0));
809            builder.export_index(GlobalId::User(2001), GlobalId::User(2000), vec![0]);
810            builder.finish()
811        };
812
813        // Without the optimizer the `Unimplemented` join is rejected by the lowering.
814        assert!(assemble(false).is_err());
815        // With it, the optimizer fills the join implementation and the dataflow lowers.
816        assert!(assemble(true).is_ok());
817    }
818
819    /// A single dataflow can export both an index and a materialized view over the
820    /// same built object (binding). Both exports reference that object; the index
821    /// arranges it and the MV sink writes it to a target shard.
822    #[mz_ore::test]
823    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
824    fn index_and_mv_same_binding() {
825        let desc = crate::data::sample_desc();
826        let loc = PersistLocation {
827            blob_uri: "mem://".parse().unwrap(),
828            consensus_uri: "mem://".parse().unwrap(),
829        };
830        let (source_id, view_id, index_id, sink_id) = (
831            GlobalId::User(1000),
832            GlobalId::User(1001),
833            GlobalId::User(1002),
834            GlobalId::User(1003),
835        );
836
837        let mut builder = DataflowBuilder::new("headless-index-and-mv");
838        let src = builder.import_persist(
839            source_id,
840            PersistSource {
841                shard: ShardId::new(),
842                location: loc.clone(),
843                desc: desc.clone(),
844                upper: Timestamp::from(1),
845            },
846        );
847        // A view over the source is the shared binding both exports reference.
848        builder.build(
849            view_id,
850            src.get().filter(vec![MirScalarExpr::literal_true()]),
851        );
852        builder.as_of(Timestamp::from(0));
853        builder.export_index(index_id, view_id, vec![0]);
854        builder.export_materialized_view(
855            sink_id,
856            view_id,
857            desc,
858            PersistSink {
859                shard: ShardId::new(),
860                location: loc,
861            },
862        );
863        let df = builder.finish().unwrap();
864
865        // Both exports are present and reference the same view binding.
866        assert_eq!(df.index_exports.len(), 1);
867        assert_eq!(df.sink_exports.len(), 1);
868        let (_iid, (idesc, _typ)) = df.index_exports.iter().next().unwrap();
869        assert_eq!(idesc.on_id, view_id);
870        let (sid, sink) = df.sink_exports.iter().next().unwrap();
871        assert_eq!(*sid, sink_id);
872        assert_eq!(sink.from, view_id);
873        // The MV sink carries the target shard's storage metadata after augment.
874        assert!(matches!(
875            sink.connection,
876            ComputeSinkConnection::MaterializedView(_)
877        ));
878    }
879
880    /// With `optimize` on, the optimizer is handed an index oracle built from the
881    /// dataflow's `index_imports`, so a `Get` over an imported (but not persisted)
882    /// collection is recognized as an arrangement read. Were the oracle empty, the
883    /// optimizer would re-plan that `Get` as a persist read of a collection that has
884    /// no source import, and `finish` would fail — so success here, with one index
885    /// import and no source imports, is the proof the index information reached the
886    /// optimizer.
887    #[mz_ore::test]
888    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
889    fn optimize_uses_imported_index() {
890        let desc = crate::data::sample_desc();
891        let on_type = ReprRelationType::from(desc.typ());
892        let (index_id, on_id, view_id, out_index_id) = (
893            GlobalId::User(1001),
894            GlobalId::User(1000),
895            GlobalId::User(2000),
896            GlobalId::User(2001),
897        );
898
899        let mut builder = DataflowBuilder::new("headless-optimize-imported-index");
900        let input = builder.import_index(index_id, on_id, vec![0], on_type, false);
901        // A view over the imported arrangement; with `optimize` the optimizer must
902        // recognize the import to plan the `Get` as an arrangement read.
903        builder.build(view_id, input.get().project(vec![0]));
904        builder.optimize();
905        builder.as_of(Timestamp::from(0));
906        builder.export_index(out_index_id, view_id, vec![0]);
907        let df = builder.finish().unwrap();
908
909        // The collection is read from the imported arrangement, not from persist:
910        // exactly one index import, no source imports.
911        assert_eq!(df.index_imports.len(), 1);
912        assert!(df.source_imports.is_empty());
913        let (iid, import) = df.index_imports.iter().next().unwrap();
914        assert_eq!(*iid, index_id);
915        assert_eq!(import.desc.on_id, on_id);
916    }
917
918    /// A count-over-index dataflow imports the index (no storage source), builds
919    /// the reduce and its arrange as two objects, and exports the count index.
920    #[mz_ore::test]
921    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
922    fn count_over_index_structure() {
923        let desc = crate::data::sample_desc();
924        let on_type = ReprRelationType::from(desc.typ());
925        let df = count_over_index(
926            GlobalId::User(1001), // existing index to import
927            GlobalId::User(1000), // collection it arranges
928            on_type,
929            vec![0],              // its key
930            GlobalId::User(2000), // reduce build object
931            GlobalId::User(2001), // exported count index
932            Timestamp::from(0),
933        )
934        .unwrap();
935
936        // Imports the arrangement, not a storage collection.
937        assert_eq!(df.index_imports.len(), 1);
938        assert!(df.source_imports.is_empty());
939        let (iid, import) = df.index_imports.iter().next().unwrap();
940        assert_eq!(*iid, GlobalId::User(1001));
941        assert_eq!(import.desc.on_id, GlobalId::User(1000));
942        assert_eq!(import.desc.key, vec![MirScalarExpr::column(0)]);
943
944        // Reduce + arrange lower to two build objects; the count index exports.
945        assert_eq!(df.objects_to_build.len(), 2);
946        assert_eq!(df.index_exports.len(), 1);
947        let (eid, (edesc, _typ)) = df.index_exports.iter().next().unwrap();
948        assert_eq!(*eid, GlobalId::User(2001));
949        assert_eq!(edesc.on_id, GlobalId::User(2000));
950        assert_eq!(edesc.key, vec![MirScalarExpr::column(0)]);
951    }
952}