Skip to main content

mz_adapter/coord/sequencer/inner/
create_continual_task.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use mz_catalog::memory::objects::{
13    CatalogCollectionEntry, CatalogEntry, CatalogItem, ContinualTask, Table, TableDataSource,
14};
15use mz_compute_types::dataflows::DataflowDescription;
16use mz_compute_types::plan::Plan;
17use mz_compute_types::sinks::{
18    ComputeSinkConnection, ContinualTaskConnection, MaterializedViewSinkConnection,
19};
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::collections::CollectionExt;
22use mz_ore::instrument;
23use mz_repr::adt::mz_acl_item::PrivilegeMap;
24use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
25use mz_repr::{
26    GlobalId, RelationVersion, RelationVersionSelector, Timestamp, VersionedRelationDesc,
27};
28use mz_sql::ast::visit_mut::{VisitMut, VisitMutNode};
29use mz_sql::ast::{Raw, RawItemName};
30use mz_sql::names::{FullItemName, PartialItemName, ResolvedIds};
31use mz_sql::normalize::unresolved_item_name;
32use mz_sql::plan;
33use mz_sql::session::metadata::SessionMetadata;
34use mz_sql_parser::ast::Statement;
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_storage_client::controller::CollectionDescription;
37use mz_transform::dataflow::DataflowMetainfo;
38use mz_transform::notice::OptimizerNotice;
39
40use crate::command::ExecuteResponse;
41use crate::coord::Coordinator;
42use crate::error::AdapterError;
43use crate::optimize::dataflows::dataflow_import_id_bundle;
44use crate::optimize::{self, Optimize, OptimizerCatalog};
45use crate::util::ResultExt;
46use crate::{ExecuteContext, catalog};
47
48impl Coordinator {
49    #[instrument]
50    pub(crate) async fn sequence_create_continual_task(
51        &mut self,
52        ctx: &mut ExecuteContext,
53        plan: plan::CreateContinualTaskPlan,
54        resolved_ids: ResolvedIds,
55    ) -> Result<ExecuteResponse, AdapterError> {
56        let desc = plan.desc.clone();
57        let name = plan.name.clone();
58        let cluster_id = plan.continual_task.cluster_id;
59
60        // Put a placeholder in the catalog so the optimizer can find something
61        // for the sink_id.
62        let (item_id, global_id) = self.allocate_user_id().await?;
63        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
64
65        let entry = CatalogEntry {
66            item: CatalogItem::Table(Table {
67                create_sql: None,
68                desc: VersionedRelationDesc::new(desc.clone()),
69                collections,
70                conn_id: None,
71                resolved_ids: resolved_ids.clone(),
72                custom_logical_compaction_window: None,
73                is_retained_metrics_object: false,
74                data_source: TableDataSource::TableWrites {
75                    defaults: Vec::new(),
76                },
77            }),
78            referenced_by: Vec::new(),
79            used_by: Vec::new(),
80            id: item_id,
81            oid: 0,
82            name: name.clone(),
83            owner_id: *ctx.session().current_role_id(),
84            privileges: PrivilegeMap::new(),
85        };
86        let bootstrap_catalog = ContinualTaskCatalogBootstrap {
87            delegate: self.owned_catalog().as_optimizer_catalog(),
88            sink_id: global_id,
89            entry: CatalogCollectionEntry {
90                entry,
91                version: RelationVersionSelector::Latest,
92            },
93        };
94
95        // Construct the CatalogItem for this CT and optimize it.
96        let mut item = crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids)?;
97        let full_name = bootstrap_catalog.resolve_full_name(&name, Some(ctx.session().conn_id()));
98        let (optimized_plan, mut physical_plan, metainfo, optimizer_features) = self
99            .optimize_create_continual_task(
100                &item,
101                global_id,
102                Arc::new(bootstrap_catalog),
103                full_name.to_string(),
104            )?;
105
106        // Timestamp selection
107        let mut id_bundle = dataflow_import_id_bundle(&physical_plan, cluster_id.clone());
108        // Can't acquire a read hold on ourselves because we don't exist yet.
109        //
110        // It is not necessary to take a read hold on the CT output in the
111        // coordinator, since the current scheme takes read holds in the
112        // coordinator only to ensure inputs don't get compacted until the
113        // compute controller has installed its own read holds, which happens
114        // below with the `ship_dataflow` call.
115        id_bundle.storage_ids.remove(&global_id);
116        let read_holds = self.acquire_read_holds(&id_bundle);
117        let as_of = read_holds.least_valid_read();
118        physical_plan.set_as_of(as_of.clone());
119        // Used in dataflow rendering to avoid the snapshot for CTs that are
120        // restarted after they have committed the snapshot output.
121        physical_plan.set_initial_as_of(as_of.clone());
122
123        // Rewrite `create_sql` to reference self with the fully qualified name.
124        // This is normally done when `create_sql` is created at plan time, but
125        // we didn't have the necessary info in name resolution.
126        item.create_sql = update_create_sql(&item.create_sql, &full_name, as_of.as_option());
127
128        let ops = vec![catalog::Op::CreateItem {
129            id: item_id,
130            name: name.clone(),
131            item: CatalogItem::ContinualTask(item),
132            owner_id: *ctx.session().current_role_id(),
133        }];
134
135        let () = self
136            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
137                Box::pin(async move {
138                    let catalog = coord.catalog_mut();
139                    catalog.set_optimized_plan(global_id, optimized_plan);
140                    catalog.set_physical_plan(global_id, physical_plan.clone());
141                    catalog.set_dataflow_metainfo(global_id, metainfo);
142                    catalog.cache_expressions(global_id, None, optimizer_features);
143
144                    coord
145                        .controller
146                        .storage
147                        .create_collections(
148                            coord.catalog.state().storage_metadata(),
149                            None,
150                            vec![(
151                                global_id,
152                                CollectionDescription::for_other(desc, Some(as_of)),
153                            )],
154                        )
155                        .await
156                        .unwrap_or_terminate("cannot fail to append");
157
158                    coord.ship_dataflow(physical_plan, cluster_id, None).await;
159                    coord.allow_writes(cluster_id, global_id);
160                })
161            })
162            .await?;
163        Ok(ExecuteResponse::CreatedContinualTask)
164    }
165
166    pub fn optimize_create_continual_task(
167        &self,
168        ct: &ContinualTask,
169        output_id: GlobalId,
170        catalog: Arc<dyn OptimizerCatalog>,
171        debug_name: String,
172    ) -> Result<
173        (
174            DataflowDescription<OptimizedMirRelationExpr>,
175            DataflowDescription<Plan>,
176            DataflowMetainfo<Arc<OptimizerNotice>>,
177            OptimizerFeatures,
178        ),
179        AdapterError,
180    > {
181        let catalog = Arc::new(NoIndexCatalog { delegate: catalog });
182
183        let (_, view_id) = self.allocate_transient_id();
184        let compute_instance = self
185            .instance_snapshot(ct.cluster_id)
186            .expect("compute instance does not exist");
187        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
188            .override_from(&self.catalog.get_cluster(ct.cluster_id).config.features());
189        let optimizer_features = optimizer_config.features.clone();
190        let non_null_assertions = Vec::new();
191        let refresh_schedule = None;
192        // Continual Tasks turn an "input" into diffs by inserting retractions,
193        // which removes any monotonicity properties the collection otherwise
194        // would have had.
195        let force_non_monotonic = [ct.input_id].into();
196        let mut optimizer = optimize::materialized_view::Optimizer::new(
197            catalog,
198            compute_instance,
199            output_id,
200            view_id,
201            ct.desc.iter_names().cloned().collect(),
202            non_null_assertions,
203            refresh_schedule,
204            debug_name,
205            optimizer_config,
206            self.optimizer_metrics(),
207            force_non_monotonic,
208        );
209
210        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
211        let local_mir_plan = optimizer.catch_unwind_optimize((*ct.raw_expr).clone())?;
212        let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
213        let optimized_plan = global_mir_plan.df_desc().clone();
214        // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
215        let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan)?;
216        let (mut physical_plan, metainfo) = global_lir_plan.unapply();
217
218        // The MV optimizer is hardcoded to output a PersistSinkConnection.
219        // Sniff it and swap for the ContinualTaskSink. If/when we split out a
220        // Continual Task optimizer, this won't be necessary, and in the
221        // meantime, it seems undesirable to burden the MV optimizer with a
222        // configuration for this.
223        for sink in physical_plan.sink_exports.values_mut() {
224            match &mut sink.connection {
225                ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
226                    storage_metadata,
227                    ..
228                }) => {
229                    sink.with_snapshot = ct.with_snapshot;
230                    sink.connection =
231                        ComputeSinkConnection::ContinualTask(ContinualTaskConnection {
232                            input_id: ct.input_id,
233                            storage_metadata: *storage_metadata,
234                        })
235                }
236                _ => unreachable!("MV should produce persist sink connection"),
237            }
238        }
239
240        // Create a metainfo with rendered notices, preallocating a transient
241        // GlobalId for each.
242        let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
243            .map(|(_item_id, global_id)| global_id)
244            .take(metainfo.optimizer_notices.len())
245            .collect();
246        let metainfo = self
247            .catalog()
248            .render_notices(metainfo, notice_ids, Some(output_id));
249
250        Ok((optimized_plan, physical_plan, metainfo, optimizer_features))
251    }
252}
253
254/// An implementation of [OptimizerCatalog] with a placeholder for the continual
255/// task to solve the self-referential CT bootstrapping problem.
256#[derive(Debug)]
257struct ContinualTaskCatalogBootstrap {
258    delegate: Arc<dyn OptimizerCatalog>,
259    sink_id: GlobalId,
260    entry: CatalogCollectionEntry,
261}
262
263impl OptimizerCatalog for ContinualTaskCatalogBootstrap {
264    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
265        if self.sink_id == *id {
266            return self.entry.clone();
267        }
268        self.delegate.get_entry(id)
269    }
270
271    fn get_entry_by_item_id(&self, id: &mz_repr::CatalogItemId) -> &CatalogEntry {
272        self.delegate.get_entry_by_item_id(id)
273    }
274
275    fn resolve_full_name(
276        &self,
277        name: &mz_sql::names::QualifiedItemName,
278        conn_id: Option<&mz_adapter_types::connection::ConnectionId>,
279    ) -> mz_sql::names::FullItemName {
280        self.delegate.resolve_full_name(name, conn_id)
281    }
282
283    fn get_indexes_on(
284        &self,
285        id: GlobalId,
286        cluster: mz_controller_types::ClusterId,
287    ) -> Box<dyn Iterator<Item = (GlobalId, &mz_catalog::memory::objects::Index)> + '_> {
288        self.delegate.get_indexes_on(id, cluster)
289    }
290}
291
292fn update_create_sql(
293    create_sql: &str,
294    ct_name: &FullItemName,
295    as_of: Option<&Timestamp>,
296) -> String {
297    struct ReplaceName(PartialItemName);
298    impl<'ast> VisitMut<'ast, Raw> for ReplaceName {
299        fn visit_item_name_mut(&mut self, node: &'ast mut RawItemName) {
300            let Ok(name) = unresolved_item_name(node.name().clone()) else {
301                return;
302            };
303            if name.matches(&self.0) {
304                *(node.name_mut()) = self.0.clone().into();
305            }
306        }
307    }
308
309    let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
310        .expect("non-system items must be parseable")
311        .into_element()
312        .ast;
313    match &mut ast {
314        Statement::CreateContinualTask(stmt) => {
315            // Replace any self-references in the statements with the full name,
316            // now that we have it.
317            stmt.visit_mut(&mut ReplaceName(PartialItemName::from(ct_name.clone())));
318            // Also fill in the initial as_of.
319            if let Some(as_of) = as_of {
320                stmt.as_of = Some(as_of.into());
321            }
322        }
323        _ => unreachable!("should be CREATE CONTINUAL TASK statement"),
324    }
325    ast.to_ast_string_stable()
326}
327
328/// An [OptimizerCatalog] impl that ignores any indexes that exist.
329///
330/// TODO(ct3): At the moment, the dataflow rendering for CTs only knows how to
331/// turn persist_sources into CT inputs. If the optimizer decides to use an
332/// existing index in the cluster, it won't work. It seems tricky/invasive to
333/// fix the render bug, so for now pretend indexes don't exist for CTs. Remove
334/// this once we fix the bug.
335#[derive(Debug)]
336struct NoIndexCatalog {
337    delegate: Arc<dyn OptimizerCatalog>,
338}
339
340impl OptimizerCatalog for NoIndexCatalog {
341    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
342        self.delegate.get_entry(id)
343    }
344
345    fn get_entry_by_item_id(&self, id: &mz_repr::CatalogItemId) -> &CatalogEntry {
346        self.delegate.get_entry_by_item_id(id)
347    }
348
349    fn resolve_full_name(
350        &self,
351        name: &mz_sql::names::QualifiedItemName,
352        conn_id: Option<&mz_adapter_types::connection::ConnectionId>,
353    ) -> mz_sql::names::FullItemName {
354        self.delegate.resolve_full_name(name, conn_id)
355    }
356
357    fn get_indexes_on(
358        &self,
359        _id: GlobalId,
360        _cluster: mz_controller_types::ClusterId,
361    ) -> Box<dyn Iterator<Item = (GlobalId, &mz_catalog::memory::objects::Index)> + '_> {
362        Box::new(std::iter::empty())
363    }
364}