mz_clusterd_test_driver/dataflow.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Assembly of compute [`DataflowDescription`]s for the headless test driver.
11//!
12//! [`DataflowBuilder`] is the generic boundary between tests and the dataflow
13//! assembly mechanism. A test describes its dataflow in terms of persist imports,
14//! MIR objects to compute, and index exports; the builder owns the parts that are
15//! hard and reusable — the MIR-to-LIR lowering, the [`RenderPlan`] conversion, the
16//! [`CollectionMetadata`] attachment, and the `SqlRelationType`-versus-
17//! `ReprRelationType` bookkeeping — and produces a
18//! `DataflowDescription<RenderPlan, CollectionMetadata>` ready to ship as
19//! [`ComputeCommand::CreateDataflow`].
20//!
21//! [`index_dataflow`] is thin sugar over the builder for the common single-index
22//! shape.
23//!
24//! [`ComputeCommand::CreateDataflow`]: mz_compute_client::protocol::command::ComputeCommand::CreateDataflow
25
26use std::collections::BTreeMap;
27
28use mz_compute_types::dataflows::{
29 BuildDesc, DataflowDescription, IndexDesc, IndexImport, SourceImport,
30};
31use mz_compute_types::plan::Plan;
32use mz_compute_types::plan::render_plan::RenderPlan;
33use mz_compute_types::sinks::{
34 ComputeSinkConnection, ComputeSinkDesc, MaterializedViewSinkConnection, SubscribeSinkConnection,
35};
36use mz_compute_types::sources::SourceInstanceDesc;
37use mz_expr::{
38 AggregateExpr, AggregateFunc, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr,
39};
40use mz_persist_types::{PersistLocation, ShardId};
41use mz_repr::optimize::OptimizerFeatures;
42use mz_repr::{GlobalId, RelationDesc, ReprRelationType, Timestamp};
43use mz_storage_types::controller::CollectionMetadata;
44use mz_transform::dataflow::DataflowMetainfo;
45use mz_transform::typecheck::empty_typechecking_context;
46use mz_transform::{EmptyStatisticsOracle, IndexOracle, TransformCtx, optimize_dataflow};
47use timely::progress::Antichain;
48
49/// A persist-backed storage collection to import into a dataflow.
50///
51/// `upper` is the exclusive upper bound of the shard's written data (the next
52/// timestamp after the last written one): for data written at a single timestamp
53/// `t`, pass `t + 1`; for data spread across `0..n_ts`, pass `n_ts`. The compute
54/// instance uses it to know when the source's data is fully available.
55#[derive(Clone, Debug)]
56pub struct PersistSource {
57 /// The data shard backing the collection.
58 pub shard: ShardId,
59 /// The persist location (blob + consensus) the shard lives in.
60 pub location: PersistLocation,
61 /// The relation schema of the collection.
62 pub desc: RelationDesc,
63 /// The exclusive upper bound of the shard's written data.
64 pub upper: Timestamp,
65}
66
67/// A persist-backed target shard for a materialized-view sink to write to.
68#[derive(Clone, Debug)]
69pub struct PersistSink {
70 /// The data shard the sink writes its output to.
71 pub shard: ShardId,
72 /// The persist location (blob + consensus) the shard lives in.
73 pub location: PersistLocation,
74}
75
76/// An [`IndexOracle`] over a dataflow's own `index_imports`, exposing exactly the
77/// arrangements this dataflow may read.
78///
79/// The real `environmentd` optimizer is handed a catalog-backed oracle that knows
80/// every index on the cluster; the test driver has no catalog, but a dataflow's
81/// `index_imports` already name exactly the arrangements available to it, so they
82/// are the correct — and only — index information to expose. Without this, the
83/// optimizer would not recognize an imported index and would re-plan a `Get` over
84/// the indexed collection as a (non-existent) persist read.
85#[derive(Debug)]
86struct ImportedIndexOracle {
87 /// `on_id` -> the `(index_id, key)` arrangements imported on it.
88 by_on_id: BTreeMap<GlobalId, Vec<(GlobalId, Vec<MirScalarExpr>)>>,
89}
90
91impl ImportedIndexOracle {
92 /// Build the oracle from a dataflow's `index_imports`, grouping by arranged id.
93 fn new(index_imports: &BTreeMap<GlobalId, IndexImport>) -> Self {
94 let mut by_on_id: BTreeMap<GlobalId, Vec<(GlobalId, Vec<MirScalarExpr>)>> = BTreeMap::new();
95 for (index_id, import) in index_imports {
96 by_on_id
97 .entry(import.desc.on_id)
98 .or_default()
99 .push((*index_id, import.desc.key.clone()));
100 }
101 ImportedIndexOracle { by_on_id }
102 }
103}
104
105impl IndexOracle for ImportedIndexOracle {
106 fn indexes_on(
107 &self,
108 id: GlobalId,
109 ) -> Box<dyn Iterator<Item = (GlobalId, &[MirScalarExpr])> + '_> {
110 match self.by_on_id.get(&id) {
111 Some(indexes) => Box::new(indexes.iter().map(|(id, key)| (*id, key.as_slice()))),
112 None => Box::new(std::iter::empty()),
113 }
114 }
115}
116
117/// A handle to an imported collection or built object, used to reference it when
118/// constructing MIR for further objects.
119#[derive(Clone, Debug)]
120pub struct Input {
121 id: GlobalId,
122 typ: ReprRelationType,
123}
124
125impl Input {
126 /// The id this input is bound to in the dataflow.
127 pub fn id(&self) -> GlobalId {
128 self.id
129 }
130
131 /// A MIR `Get` of this input, carrying its relation type, for use as a leaf
132 /// when building a computation over it.
133 pub fn get(&self) -> MirRelationExpr {
134 MirRelationExpr::global_get(self.id, self.typ.clone())
135 }
136}
137
138/// Builds a compute dataflow from generic parts, hiding the lowering and persist
139/// wiring mechanism.
140///
141/// # Contract
142///
143/// By default the caller supplies MIR and the builder lowers it *faithfully*,
144/// attaching the persist wiring without optimizing — so a hand-built minimal plan
145/// lowers exactly as written. Optimization — fusion, predicate pushdown, and notably
146/// join-implementation selection — is opt-in via [`Self::optimize`], paid for only
147/// by callers that need it. A `Join` whose `implementation` is left `Unimplemented`
148/// is rejected by the LIR lowering, so a plan containing one requires `optimize`,
149/// which runs [`mz_transform::optimize_dataflow`] to fill the implementation first.
150/// When optimizing, the builder hands the optimizer an index oracle built from its
151/// own `index_imports` (`ImportedIndexOracle`), so imported arrangements are
152/// recognized — the same index information `environmentd`'s catalog oracle would
153/// supply for these imports.
154///
155/// # Construction strategy
156///
157/// The builder deliberately does *not* hand-roll the [`RenderPlan`]: the [`LirId`]s
158/// used to stitch nodes together have no public constructor, and the [`LetFreePlan`]
159/// invariants (notably a valid `topological_order`) are easy to get wrong. Instead
160/// it mirrors exactly what the real compute controller does:
161///
162/// 1. Accumulate a MIR-level [`DataflowDescription<OptimizedMirRelationExpr, ()>`]
163/// using the same [`import_source`] / [`insert_plan`] / [`export_index`] helpers
164/// the optimizer uses.
165/// 2. Lower it to LIR via [`Plan::finalize_dataflow`], yielding
166/// [`DataflowDescription<Plan, ()>`].
167/// 3. Augment it into [`DataflowDescription<RenderPlan, CollectionMetadata>`] by
168/// converting each object's [`Plan`] via [`RenderPlan::try_from`] and attaching
169/// the storage [`CollectionMetadata`] to each source import — the same step
170/// performed in `compute-client`'s `Instance::create_dataflow`.
171///
172/// This guarantees the emitted plan is structurally identical to one produced by a
173/// live `environmentd`, at the cost of running the (cheap, deterministic) lowering
174/// in-process.
175///
176/// [`LirId`]: mz_compute_types::plan::LirId
177/// [`LetFreePlan`]: mz_compute_types::plan::render_plan::LetFreePlan
178/// [`import_source`]: DataflowDescription::import_source
179/// [`insert_plan`]: DataflowDescription::insert_plan
180/// [`export_index`]: DataflowDescription::export_index
181/// [`DataflowDescription<OptimizedMirRelationExpr, ()>`]: DataflowDescription
182/// [`DataflowDescription<Plan, ()>`]: DataflowDescription
183/// [`DataflowDescription<RenderPlan, CollectionMetadata>`]: DataflowDescription
184pub struct DataflowBuilder {
185 /// The MIR-level description being accumulated.
186 mir: DataflowDescription<OptimizedMirRelationExpr, ()>,
187 /// Persist metadata per imported source id, consumed by the augment step.
188 sources: BTreeMap<GlobalId, PersistSource>,
189 /// Target storage metadata per materialized-view sink id, consumed by the
190 /// augment step to fill the sink connection's `storage_metadata`.
191 sinks: BTreeMap<GlobalId, CollectionMetadata>,
192 /// Relation type per referenceable id (imports and built objects), so
193 /// `export_index` can derive the `on_type` instead of taking it as an argument.
194 types: BTreeMap<GlobalId, ReprRelationType>,
195 /// Whether `finish` runs the MIR dataflow optimizer before lowering. Off by
196 /// default (faithful lowering of the caller's MIR); see [`Self::optimize`].
197 optimize: bool,
198}
199
200impl DataflowBuilder {
201 /// Start an empty builder. `name` becomes the dataflow's debug name.
202 pub fn new(name: impl Into<String>) -> Self {
203 DataflowBuilder {
204 mir: DataflowDescription::new(name.into()),
205 sources: BTreeMap::new(),
206 sinks: BTreeMap::new(),
207 types: BTreeMap::new(),
208 optimize: false,
209 }
210 }
211
212 /// Import a persist-backed storage collection as `id`.
213 ///
214 /// Registers the source on the MIR description and records the persist metadata
215 /// for the augment step. Returns an [`Input`] handle whose [`Input::get`] yields
216 /// a correctly typed `Get` node, so callers never construct a [`ReprRelationType`]
217 /// by hand.
218 pub fn import_persist(&mut self, id: GlobalId, source: PersistSource) -> Input {
219 // `import_source` takes the `SqlRelationType`; the `Get`/export path wants the
220 // `ReprRelationType`. Both are derived from the single `desc`.
221 let sql_typ = source.desc.typ().clone();
222 let repr_typ = ReprRelationType::from(source.desc.typ());
223 // `monotonic: false` matches the verified-structure requirement.
224 self.mir.import_source(id, sql_typ, false);
225 self.sources.insert(id, source);
226 self.types.insert(id, repr_typ.clone());
227 Input { id, typ: repr_typ }
228 }
229
230 /// Import a previously-exported index, making the collection it arranges
231 /// (`on_id`) available to this dataflow as an in-memory arrangement.
232 ///
233 /// Unlike [`Self::import_persist`], this imports no storage collection: the
234 /// arrangement is served from the replica's existing, hydrated index, so the
235 /// dataflow needs no [`CollectionMetadata`] and the augment step leaves the
236 /// index import untouched. The MIR-to-LIR lowering registers the imported
237 /// arrangement under `Get(on_id)` automatically, so a faithful (unoptimized)
238 /// `Get(on_id)` picks it up. Returns an [`Input`] referencing `on_id` — the
239 /// id a computation `Get`s, not the index id itself.
240 pub fn import_index(
241 &mut self,
242 index_id: GlobalId,
243 on_id: GlobalId,
244 key_cols: Vec<usize>,
245 on_type: ReprRelationType,
246 monotonic: bool,
247 ) -> Input {
248 let key: Vec<MirScalarExpr> = key_cols.into_iter().map(MirScalarExpr::column).collect();
249 self.mir.import_index(
250 index_id,
251 IndexDesc { on_id, key },
252 on_type.clone(),
253 monotonic,
254 );
255 self.types.insert(on_id, on_type.clone());
256 Input {
257 id: on_id,
258 typ: on_type,
259 }
260 }
261
262 /// A typed `Get` of an already-imported or built id, for callers that
263 /// assemble MIR by id rather than threading [`Input`] handles — notably the
264 /// JSON MIR translator. Errors if `id` was never imported or built, so a bad
265 /// reference surfaces cleanly instead of constructing an ill-typed `Get`.
266 pub fn get(&self, id: GlobalId) -> anyhow::Result<MirRelationExpr> {
267 let typ = self
268 .types
269 .get(&id)
270 .ok_or_else(|| anyhow::anyhow!("get of unknown id {id}; import or build it first"))?
271 .clone();
272 Ok(MirRelationExpr::global_get(id, typ))
273 }
274
275 /// Insert a MIR object to compute, bound to `id`.
276 ///
277 /// `expr` is wrapped via [`OptimizedMirRelationExpr::declare_optimized`]; the
278 /// caller is responsible for any optimization (see the type-level contract). The
279 /// object's relation type is recorded so a later [`Self::export_index`] over `id`
280 /// can derive its `on_type`.
281 pub fn build(&mut self, id: GlobalId, expr: MirRelationExpr) -> &mut Self {
282 self.types.insert(id, expr.typ());
283 self.mir
284 .insert_plan(id, OptimizedMirRelationExpr::declare_optimized(expr));
285 self
286 }
287
288 /// Export an index `index_id` arranging `on_id` by `key_cols`.
289 ///
290 /// `on_id` may be an imported source or a built object; either way the lowering
291 /// synthesizes the `ArrangeBy`. The `on_type` is derived from the referenced id,
292 /// which must have been imported or built first.
293 pub fn export_index(
294 &mut self,
295 index_id: GlobalId,
296 on_id: GlobalId,
297 key_cols: Vec<usize>,
298 ) -> &mut Self {
299 let on_type = self
300 .types
301 .get(&on_id)
302 .unwrap_or_else(|| panic!("export_index on unknown id {on_id}"))
303 .clone();
304 let key: Vec<MirScalarExpr> = key_cols.into_iter().map(MirScalarExpr::column).collect();
305 self.mir
306 .export_index(index_id, IndexDesc { on_id, key }, on_type);
307 self
308 }
309
310 /// Export a materialized-view persist sink `sink_id` writing the collection
311 /// `from_id` to a target persist shard (a materialized view).
312 ///
313 /// `value_desc` is the output relation schema; it must match `from_id`'s type
314 /// (validated by the caller). The target shard is identified by `target`, whose
315 /// `CollectionMetadata` the augment step splices into the sink connection — the
316 /// compute persist sink opens it as `SourceData/()/Timestamp/StorageDiff`, the
317 /// same codec a storage collection uses, so the shard reads back like any other.
318 ///
319 /// `up_to` is always the empty antichain: the persist sink does not implement
320 /// `UP TO` (it panics during rendering otherwise), and the real optimizer
321 /// likewise leaves a materialized view's `up_to` empty — it is a subscribe-only
322 /// concept.
323 pub fn export_materialized_view(
324 &mut self,
325 sink_id: GlobalId,
326 from_id: GlobalId,
327 value_desc: RelationDesc,
328 target: PersistSink,
329 ) -> &mut Self {
330 let metadata = CollectionMetadata {
331 persist_location: target.location,
332 data_shard: target.shard,
333 relation_desc: value_desc.clone(),
334 txns_shard: None,
335 };
336 self.sinks.insert(sink_id, metadata);
337 // The MIR-level description carries the unit storage metadata; the augment
338 // step replaces it with the `CollectionMetadata` recorded above.
339 let desc = ComputeSinkDesc {
340 from: from_id,
341 from_desc: value_desc.clone(),
342 connection: ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
343 value_desc,
344 storage_metadata: (),
345 }),
346 with_snapshot: true,
347 up_to: Antichain::new(),
348 non_null_assertions: vec![],
349 refresh_schedule: None,
350 };
351 self.mir.export_sink(sink_id, desc);
352 self
353 }
354
355 /// Export a subscribe sink `sink_id` streaming changes of the collection
356 /// `from_id` back as `ComputeResponse::SubscribeResponse` batches.
357 ///
358 /// Unlike a materialized view, a subscribe writes no shard, so it needs no
359 /// storage metadata. `value_desc` is the output schema (must match `from_id`'s
360 /// type); `up_to` is the exclusive upper at which the subscribe completes. The
361 /// empty `output` ordering leaves intra-timestamp order unconstrained — the
362 /// driver consolidates and sorts the updates for a deterministic golden.
363 pub fn export_subscribe(
364 &mut self,
365 sink_id: GlobalId,
366 from_id: GlobalId,
367 value_desc: RelationDesc,
368 up_to: Antichain<Timestamp>,
369 ) -> &mut Self {
370 let desc = ComputeSinkDesc {
371 from: from_id,
372 from_desc: value_desc,
373 connection: ComputeSinkConnection::Subscribe(SubscribeSinkConnection {
374 output: vec![],
375 }),
376 with_snapshot: true,
377 up_to,
378 non_null_assertions: vec![],
379 refresh_schedule: None,
380 };
381 self.mir.export_sink(sink_id, desc);
382 self
383 }
384
385 /// Set the dataflow's `as_of` (the read frontier hydration starts from).
386 pub fn as_of(&mut self, t: Timestamp) -> &mut Self {
387 self.mir.as_of = Some(Antichain::from_elem(t));
388 self
389 }
390
391 /// Set the dataflow's `until` (the exclusive upper bound past which output is
392 /// dropped). Defaults to the empty antichain (no bound).
393 pub fn until(&mut self, t: Timestamp) -> &mut Self {
394 self.mir.until = Antichain::from_elem(t);
395 self
396 }
397
398 /// Run the MIR dataflow optimizer in [`Self::finish`] before lowering.
399 ///
400 /// Off by default: the builder otherwise lowers the caller's MIR faithfully (the
401 /// contract above). Enable it for plans that don't lower from raw MIR — notably a
402 /// `Join`, whose `implementation` defaults to `Unimplemented` and is rejected by
403 /// the LIR lowering until [`mz_transform::optimize_dataflow`]'s `JoinImplementation`
404 /// fills it in — or to reproduce the plan `environmentd` would ship for a logical
405 /// expression rather than the literal one written.
406 pub fn optimize(&mut self) -> &mut Self {
407 self.optimize = true;
408 self
409 }
410
411 /// Lower the accumulated MIR and attach persist wiring, producing the
412 /// `DataflowDescription` the compute protocol expects.
413 ///
414 /// Returns an error rather than panicking on a malformed plan (e.g. a key
415 /// column out of range, or an unbalanced object graph), so a caller driving
416 /// this from external input — notably the script reader — can surface a clean
417 /// error instead of crashing the process.
418 pub fn finish(mut self) -> anyhow::Result<DataflowDescription<RenderPlan, CollectionMetadata>> {
419 let features = OptimizerFeatures::default();
420 // Optionally run the MIR dataflow optimizer first (e.g. to fill a `Join`'s
421 // implementation). The index oracle is built from this dataflow's own
422 // `index_imports`, so the optimizer recognizes imported arrangements and
423 // plans `Get`s over them as arrangement reads (not persist reads); the
424 // statistics oracle is empty — no catalog stats — so join planning falls
425 // back to a differential join, which lowers.
426 if self.optimize {
427 let indexes = ImportedIndexOracle::new(&self.mir.index_imports);
428 let typecheck_ctx = empty_typechecking_context();
429 let mut df_meta = DataflowMetainfo::default();
430 let mut ctx = TransformCtx::global(
431 &indexes,
432 &EmptyStatisticsOracle,
433 &features,
434 &typecheck_ctx,
435 &mut df_meta,
436 None,
437 );
438 optimize_dataflow(&mut self.mir, &mut ctx, false)
439 .map_err(|e| anyhow::anyhow!("optimizing dataflow failed: {e}"))?;
440 }
441 // Lower MIR -> LIR. Deterministic and self-contained.
442 let lowered: DataflowDescription<Plan, ()> =
443 Plan::finalize_dataflow(self.mir, &features)
444 .map_err(|e| anyhow::anyhow!("lowering dataflow failed: {e}"))?;
445 augment(lowered, &self.sources, &self.sinks)
446 }
447}
448
449/// Build a single-index dataflow over a persist shard.
450///
451/// Thin sugar over [`DataflowBuilder`] for the common shape: import the collection
452/// backed by `shard` as `source_id`, set `as_of`, and export an index `index_id`
453/// arranging the collection by `key_cols`.
454///
455/// `shard_upper` is the exclusive upper bound of the shard's written data; see
456/// [`PersistSource::upper`].
457pub fn index_dataflow(
458 source_id: GlobalId,
459 index_id: GlobalId,
460 shard: ShardId,
461 location: PersistLocation,
462 desc: RelationDesc,
463 key_cols: Vec<usize>,
464 as_of: Timestamp,
465 shard_upper: Timestamp,
466) -> anyhow::Result<DataflowDescription<RenderPlan, CollectionMetadata>> {
467 let mut builder = DataflowBuilder::new("headless-index");
468 builder.import_persist(
469 source_id,
470 PersistSource {
471 shard,
472 location,
473 desc,
474 upper: shard_upper,
475 },
476 );
477 builder.as_of(as_of);
478 builder.export_index(index_id, source_id, key_cols);
479 builder.finish()
480}
481
482/// Build a dataflow that counts the rows of an existing index and exports the
483/// count as a new, peekable index.
484///
485/// Imports index `index_id` (arranging `on_id`, schema `on_type`, key `key_cols`),
486/// computes `Reduce` with a single `count(*)` aggregate and an empty group key over
487/// `Get(on_id)`, and exports `out_index_id` arranging the one-column count by `[0]`.
488/// This is the compute-side realization of a row-count assertion: the count runs
489/// through a real reduce operator rather than being tallied in the driver.
490///
491/// The result collection has one `bigint` column. Over an empty input the reduce
492/// emits no rows (SQL's default-zero is added higher up), so a peek of the output
493/// yields `[]`, which callers read as a count of `0`.
494pub fn count_over_index(
495 index_id: GlobalId,
496 on_id: GlobalId,
497 on_type: ReprRelationType,
498 key_cols: Vec<usize>,
499 reduce_id: GlobalId,
500 out_index_id: GlobalId,
501 as_of: Timestamp,
502) -> anyhow::Result<DataflowDescription<RenderPlan, CollectionMetadata>> {
503 let mut builder = DataflowBuilder::new("headless-count");
504 // `monotonic: false` keeps the import faithful to a general (non-append-only)
505 // index; the count reduce does not require monotonicity.
506 let input = builder.import_index(index_id, on_id, key_cols, on_type, false);
507 // `count(*)`: count over a non-null literal, so every row contributes.
508 let count = AggregateExpr {
509 func: AggregateFunc::Count,
510 expr: MirScalarExpr::literal_true(),
511 distinct: false,
512 };
513 let reduce = MirRelationExpr::Reduce {
514 input: Box::new(input.get()),
515 group_key: vec![],
516 aggregates: vec![count],
517 monotonic: false,
518 expected_group_size: None,
519 };
520 builder.build(reduce_id, reduce);
521 builder.as_of(as_of);
522 // The reduce output is a single column; arrange it by that column so the
523 // exported index is peekable.
524 builder.export_index(out_index_id, reduce_id, vec![0]);
525 builder.finish()
526}
527
528/// Convert a lowered `DataflowDescription<Plan, ()>` into the
529/// `<RenderPlan, CollectionMetadata>` form expected by the compute protocol.
530///
531/// Mirrors `compute-client`'s `Instance::create_dataflow`: each object's [`Plan`]
532/// is flattened into a [`RenderPlan`], and every source import is augmented with the
533/// storage [`CollectionMetadata`] needed by the compute instance to read it. The
534/// per-id [`PersistSource`] supplies the metadata and the exclusive `upper` telling
535/// the compute instance up to which timestamp the shard's data is available.
536fn augment(
537 lowered: DataflowDescription<Plan, ()>,
538 sources: &BTreeMap<GlobalId, PersistSource>,
539 sinks: &BTreeMap<GlobalId, CollectionMetadata>,
540) -> anyhow::Result<DataflowDescription<RenderPlan, CollectionMetadata>> {
541 // Attach the storage metadata to each source import, looked up by id. In a live
542 // controller the `upper` is the storage collection's real write frontier; the
543 // caller provides it via `PersistSource::upper` to reflect the written data.
544 let mut source_imports = BTreeMap::new();
545 for (id, import) in lowered.source_imports {
546 let source = sources
547 .get(&id)
548 .ok_or_else(|| anyhow::anyhow!("no persist metadata registered for source {id}"))?;
549 let metadata = CollectionMetadata {
550 persist_location: source.location.clone(),
551 data_shard: source.shard,
552 relation_desc: source.desc.clone(),
553 txns_shard: None,
554 };
555 let desc = SourceInstanceDesc {
556 storage_metadata: metadata,
557 arguments: import.desc.arguments,
558 typ: import.desc.typ,
559 };
560 source_imports.insert(
561 id,
562 SourceImport {
563 desc,
564 monotonic: import.monotonic,
565 with_snapshot: import.with_snapshot,
566 upper: Antichain::from_elem(source.upper),
567 },
568 );
569 }
570
571 let objects_to_build = lowered
572 .objects_to_build
573 .into_iter()
574 .map(|object| {
575 // `RenderPlan::try_from` fails (with `()`) on a structurally invalid
576 // lowered plan; surface it as an error rather than panicking.
577 let plan = RenderPlan::try_from(object.plan)
578 .map_err(|()| anyhow::anyhow!("RenderPlan conversion failed for {}", object.id))?;
579 Ok::<_, anyhow::Error>(BuildDesc {
580 id: object.id,
581 plan,
582 })
583 })
584 .collect::<anyhow::Result<Vec<_>>>()?;
585
586 // Splice the storage metadata into each sink export, mirroring how
587 // `compute-client`'s `Instance::create_dataflow` fills the materialized-view
588 // sink's `storage_metadata` from the storage controller. A subscribe carries no
589 // metadata; copy-to is not built by this driver.
590 let mut sink_exports = BTreeMap::new();
591 for (id, sink) in lowered.sink_exports {
592 let connection = match sink.connection {
593 ComputeSinkConnection::MaterializedView(conn) => {
594 let metadata = sinks.get(&id).ok_or_else(|| {
595 anyhow::anyhow!("no target metadata registered for materialized-view sink {id}")
596 })?;
597 ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
598 value_desc: conn.value_desc,
599 storage_metadata: metadata.clone(),
600 })
601 }
602 ComputeSinkConnection::Subscribe(conn) => ComputeSinkConnection::Subscribe(conn),
603 ComputeSinkConnection::CopyToS3Oneshot(_) => {
604 anyhow::bail!("copy-to-s3 sink {id} is not implemented")
605 }
606 };
607 sink_exports.insert(
608 id,
609 ComputeSinkDesc {
610 from: sink.from,
611 from_desc: sink.from_desc,
612 connection,
613 with_snapshot: sink.with_snapshot,
614 up_to: sink.up_to,
615 non_null_assertions: sink.non_null_assertions,
616 refresh_schedule: sink.refresh_schedule,
617 },
618 );
619 }
620
621 Ok(DataflowDescription {
622 source_imports,
623 objects_to_build,
624 // The remaining fields carry over unchanged from the lowered dataflow.
625 index_imports: lowered.index_imports,
626 index_exports: lowered.index_exports,
627 sink_exports,
628 as_of: lowered.as_of,
629 until: lowered.until,
630 initial_storage_as_of: lowered.initial_storage_as_of,
631 refresh_schedule: lowered.refresh_schedule,
632 debug_name: lowered.debug_name,
633 time_dependence: lowered.time_dependence,
634 })
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640
641 use mz_compute_types::plan::GetPlan;
642 use mz_compute_types::plan::render_plan::Expr;
643 use mz_expr::Id;
644
645 /// Assert the assembled dataflow matches the verified structure: a single
646 /// source import, a single object building `Get(source) -> ArrangeBy(key)`,
647 /// and a single index export over the source.
648 #[mz_ore::test]
649 #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
650 fn index_dataflow_structure() {
651 let desc = crate::data::sample_desc();
652 let loc = PersistLocation {
653 blob_uri: "mem://".parse().unwrap(),
654 consensus_uri: "mem://".parse().unwrap(),
655 };
656 let df = index_dataflow(
657 GlobalId::User(1000),
658 GlobalId::User(1001),
659 ShardId::new(),
660 loc,
661 desc,
662 vec![0],
663 Timestamp::from(0),
664 Timestamp::from(1),
665 )
666 .unwrap();
667 // Structural assertions mirroring the spec.
668 assert_eq!(df.source_imports.len(), 1);
669 assert_eq!(df.objects_to_build.len(), 1);
670 assert_eq!(df.index_exports.len(), 1);
671 assert!(df.sink_exports.is_empty());
672 assert!(df.index_imports.is_empty());
673 assert_eq!(df.as_of, Some(Antichain::from_elem(Timestamp::from(0))));
674 assert_eq!(df.debug_name, "headless-index");
675
676 let (sid, si) = df.source_imports.iter().next().unwrap();
677 assert_eq!(*sid, GlobalId::User(1000));
678 assert!(si.with_snapshot);
679 assert!(!si.monotonic);
680 assert_eq!(si.upper, Antichain::from_elem(Timestamp::from(1)));
681 assert!(si.desc.arguments.operators.is_none());
682
683 let (iid, (idesc, _typ)) = df.index_exports.iter().next().unwrap();
684 assert_eq!(*iid, GlobalId::User(1001));
685 assert_eq!(idesc.on_id, GlobalId::User(1000));
686 assert_eq!(idesc.key, vec![MirScalarExpr::column(0)]);
687
688 // The built object is `Get(source) -> ArrangeBy(key)`. Destructure the
689 // `RenderPlan` and verify the root arranges, keyed by `Column(0)`, over a
690 // `Get` of the source collection.
691 let plan = &df.objects_to_build[0].plan;
692 assert!(plan.binds.is_empty());
693 let (nodes, root, _order) = plan.body.clone().destruct();
694 let root_node = &nodes[&root];
695 let Expr::ArrangeBy {
696 input,
697 forms,
698 strategy,
699 ..
700 } = &root_node.expr
701 else {
702 panic!("expected root ArrangeBy, got {:?}", root_node.expr);
703 };
704 assert_eq!(forms.arranged.len(), 1);
705 assert_eq!(forms.arranged[0].0, vec![MirScalarExpr::column(0)]);
706 assert_eq!(
707 *strategy,
708 mz_compute_types::plan::ArrangementStrategy::Direct
709 );
710 let input_node = &nodes[input];
711 let Expr::Get { id, plan, .. } = &input_node.expr else {
712 panic!("expected ArrangeBy input Get, got {:?}", input_node.expr);
713 };
714 assert_eq!(*id, Id::Global(GlobalId::User(1000)));
715 assert!(matches!(plan, GetPlan::PassArrangements));
716 }
717
718 /// Exercise the general `build` path: import a source, compute a `Project` over
719 /// it, and export an index on the computed object. The computation and the
720 /// arrange must lower to two distinct objects, and the index export must
721 /// reference the built object rather than the source.
722 #[mz_ore::test]
723 #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
724 fn build_computed_object_lowers() {
725 let desc = crate::data::sample_desc();
726 let loc = PersistLocation {
727 blob_uri: "mem://".parse().unwrap(),
728 consensus_uri: "mem://".parse().unwrap(),
729 };
730 let (source_id, comp_id, index_id) = (
731 GlobalId::User(1000),
732 GlobalId::User(1001),
733 GlobalId::User(1002),
734 );
735
736 let mut builder = DataflowBuilder::new("headless-build");
737 let src = builder.import_persist(
738 source_id,
739 PersistSource {
740 shard: ShardId::new(),
741 location: loc,
742 desc,
743 upper: Timestamp::from(1),
744 },
745 );
746 // Project away the payload column, keeping only `id` (column 0).
747 builder.build(comp_id, src.get().project(vec![0]));
748 builder.as_of(Timestamp::from(0));
749 builder.export_index(index_id, comp_id, vec![0]);
750 let df = builder.finish().unwrap();
751
752 // One source import; the index export references the computed object.
753 assert_eq!(df.source_imports.len(), 1);
754 assert!(df.source_imports.contains_key(&source_id));
755 let (iid, (idesc, _typ)) = df.index_exports.iter().next().unwrap();
756 assert_eq!(*iid, index_id);
757 assert_eq!(idesc.on_id, comp_id);
758
759 // The computation and the arrange lower to two distinct build objects.
760 assert_eq!(df.objects_to_build.len(), 2);
761 let ids: Vec<_> = df.objects_to_build.iter().map(|o| o.id).collect();
762 assert!(ids.contains(&comp_id));
763 assert!(ids.contains(&index_id));
764 }
765
766 /// A `Join` does not lower from raw MIR — its `implementation` defaults to
767 /// `Unimplemented` and the LIR lowering rejects it — but `optimize()` runs the
768 /// MIR optimizer first, which fills the implementation, so the same dataflow
769 /// then lowers. This is exactly what the `optimize` flag buys.
770 #[mz_ore::test]
771 #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
772 fn join_lowers_only_with_optimize() {
773 let loc = PersistLocation {
774 blob_uri: "mem://".parse().unwrap(),
775 consensus_uri: "mem://".parse().unwrap(),
776 };
777 // Build a two-source equi-join (`#0 = #2` across the concatenated columns)
778 // and export an index over it. `optimize` selects whether the MIR optimizer
779 // runs in `finish`.
780 let assemble = |optimize: bool| {
781 let mut builder = DataflowBuilder::new("headless-join-test");
782 let left = builder.import_persist(
783 GlobalId::User(1000),
784 PersistSource {
785 shard: ShardId::new(),
786 location: loc.clone(),
787 desc: crate::data::sample_desc(),
788 upper: Timestamp::from(1),
789 },
790 );
791 let right = builder.import_persist(
792 GlobalId::User(1001),
793 PersistSource {
794 shard: ShardId::new(),
795 location: loc.clone(),
796 desc: crate::data::sample_desc(),
797 upper: Timestamp::from(1),
798 },
799 );
800 let join = MirRelationExpr::join_scalars(
801 vec![left.get(), right.get()],
802 vec![vec![MirScalarExpr::column(0), MirScalarExpr::column(2)]],
803 );
804 builder.build(GlobalId::User(2000), join);
805 if optimize {
806 builder.optimize();
807 }
808 builder.as_of(Timestamp::from(0));
809 builder.export_index(GlobalId::User(2001), GlobalId::User(2000), vec![0]);
810 builder.finish()
811 };
812
813 // Without the optimizer the `Unimplemented` join is rejected by the lowering.
814 assert!(assemble(false).is_err());
815 // With it, the optimizer fills the join implementation and the dataflow lowers.
816 assert!(assemble(true).is_ok());
817 }
818
819 /// A single dataflow can export both an index and a materialized view over the
820 /// same built object (binding). Both exports reference that object; the index
821 /// arranges it and the MV sink writes it to a target shard.
822 #[mz_ore::test]
823 #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
824 fn index_and_mv_same_binding() {
825 let desc = crate::data::sample_desc();
826 let loc = PersistLocation {
827 blob_uri: "mem://".parse().unwrap(),
828 consensus_uri: "mem://".parse().unwrap(),
829 };
830 let (source_id, view_id, index_id, sink_id) = (
831 GlobalId::User(1000),
832 GlobalId::User(1001),
833 GlobalId::User(1002),
834 GlobalId::User(1003),
835 );
836
837 let mut builder = DataflowBuilder::new("headless-index-and-mv");
838 let src = builder.import_persist(
839 source_id,
840 PersistSource {
841 shard: ShardId::new(),
842 location: loc.clone(),
843 desc: desc.clone(),
844 upper: Timestamp::from(1),
845 },
846 );
847 // A view over the source is the shared binding both exports reference.
848 builder.build(
849 view_id,
850 src.get().filter(vec![MirScalarExpr::literal_true()]),
851 );
852 builder.as_of(Timestamp::from(0));
853 builder.export_index(index_id, view_id, vec![0]);
854 builder.export_materialized_view(
855 sink_id,
856 view_id,
857 desc,
858 PersistSink {
859 shard: ShardId::new(),
860 location: loc,
861 },
862 );
863 let df = builder.finish().unwrap();
864
865 // Both exports are present and reference the same view binding.
866 assert_eq!(df.index_exports.len(), 1);
867 assert_eq!(df.sink_exports.len(), 1);
868 let (_iid, (idesc, _typ)) = df.index_exports.iter().next().unwrap();
869 assert_eq!(idesc.on_id, view_id);
870 let (sid, sink) = df.sink_exports.iter().next().unwrap();
871 assert_eq!(*sid, sink_id);
872 assert_eq!(sink.from, view_id);
873 // The MV sink carries the target shard's storage metadata after augment.
874 assert!(matches!(
875 sink.connection,
876 ComputeSinkConnection::MaterializedView(_)
877 ));
878 }
879
880 /// With `optimize` on, the optimizer is handed an index oracle built from the
881 /// dataflow's `index_imports`, so a `Get` over an imported (but not persisted)
882 /// collection is recognized as an arrangement read. Were the oracle empty, the
883 /// optimizer would re-plan that `Get` as a persist read of a collection that has
884 /// no source import, and `finish` would fail — so success here, with one index
885 /// import and no source imports, is the proof the index information reached the
886 /// optimizer.
887 #[mz_ore::test]
888 #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
889 fn optimize_uses_imported_index() {
890 let desc = crate::data::sample_desc();
891 let on_type = ReprRelationType::from(desc.typ());
892 let (index_id, on_id, view_id, out_index_id) = (
893 GlobalId::User(1001),
894 GlobalId::User(1000),
895 GlobalId::User(2000),
896 GlobalId::User(2001),
897 );
898
899 let mut builder = DataflowBuilder::new("headless-optimize-imported-index");
900 let input = builder.import_index(index_id, on_id, vec![0], on_type, false);
901 // A view over the imported arrangement; with `optimize` the optimizer must
902 // recognize the import to plan the `Get` as an arrangement read.
903 builder.build(view_id, input.get().project(vec![0]));
904 builder.optimize();
905 builder.as_of(Timestamp::from(0));
906 builder.export_index(out_index_id, view_id, vec![0]);
907 let df = builder.finish().unwrap();
908
909 // The collection is read from the imported arrangement, not from persist:
910 // exactly one index import, no source imports.
911 assert_eq!(df.index_imports.len(), 1);
912 assert!(df.source_imports.is_empty());
913 let (iid, import) = df.index_imports.iter().next().unwrap();
914 assert_eq!(*iid, index_id);
915 assert_eq!(import.desc.on_id, on_id);
916 }
917
918 /// A count-over-index dataflow imports the index (no storage source), builds
919 /// the reduce and its arrange as two objects, and exports the count index.
920 #[mz_ore::test]
921 #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
922 fn count_over_index_structure() {
923 let desc = crate::data::sample_desc();
924 let on_type = ReprRelationType::from(desc.typ());
925 let df = count_over_index(
926 GlobalId::User(1001), // existing index to import
927 GlobalId::User(1000), // collection it arranges
928 on_type,
929 vec![0], // its key
930 GlobalId::User(2000), // reduce build object
931 GlobalId::User(2001), // exported count index
932 Timestamp::from(0),
933 )
934 .unwrap();
935
936 // Imports the arrangement, not a storage collection.
937 assert_eq!(df.index_imports.len(), 1);
938 assert!(df.source_imports.is_empty());
939 let (iid, import) = df.index_imports.iter().next().unwrap();
940 assert_eq!(*iid, GlobalId::User(1001));
941 assert_eq!(import.desc.on_id, GlobalId::User(1000));
942 assert_eq!(import.desc.key, vec![MirScalarExpr::column(0)]);
943
944 // Reduce + arrange lower to two build objects; the count index exports.
945 assert_eq!(df.objects_to_build.len(), 2);
946 assert_eq!(df.index_exports.len(), 1);
947 let (eid, (edesc, _typ)) = df.index_exports.iter().next().unwrap();
948 assert_eq!(*eid, GlobalId::User(2001));
949 assert_eq!(edesc.on_id, GlobalId::User(2000));
950 assert_eq!(edesc.key, vec![MirScalarExpr::column(0)]);
951 }
952}