Skip to main content

mz_adapter/coord/sequencer/inner/
create_continual_task.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use mz_catalog::memory::objects::{
13    CatalogCollectionEntry, CatalogEntry, CatalogItem, ContinualTask, Table, TableDataSource,
14};
15use mz_compute_types::dataflows::DataflowDescription;
16use mz_compute_types::plan::Plan;
17use mz_compute_types::sinks::{
18    ComputeSinkConnection, ContinualTaskConnection, MaterializedViewSinkConnection,
19};
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::collections::CollectionExt;
22use mz_ore::instrument;
23use mz_repr::adt::mz_acl_item::PrivilegeMap;
24use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
25use mz_repr::{
26    GlobalId, RelationVersion, RelationVersionSelector, Timestamp, VersionedRelationDesc,
27};
28use mz_sql::ast::visit_mut::{VisitMut, VisitMutNode};
29use mz_sql::ast::{Raw, RawItemName};
30use mz_sql::names::{FullItemName, PartialItemName, ResolvedIds};
31use mz_sql::normalize::unresolved_item_name;
32use mz_sql::plan;
33use mz_sql::session::metadata::SessionMetadata;
34use mz_sql_parser::ast::Statement;
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_storage_client::controller::CollectionDescription;
37use mz_transform::dataflow::DataflowMetainfo;
38use mz_transform::notice::OptimizerNotice;
39
40use crate::command::ExecuteResponse;
41use crate::coord::Coordinator;
42use crate::error::AdapterError;
43use crate::optimize::dataflows::dataflow_import_id_bundle;
44use crate::optimize::{self, Optimize, OptimizerCatalog};
45use crate::util::ResultExt;
46use crate::{ExecuteContext, catalog};
47
48impl Coordinator {
49    #[instrument]
50    pub(crate) async fn sequence_create_continual_task(
51        &mut self,
52        ctx: &mut ExecuteContext,
53        plan: plan::CreateContinualTaskPlan,
54        resolved_ids: ResolvedIds,
55    ) -> Result<ExecuteResponse, AdapterError> {
56        let desc = plan.desc.clone();
57        let name = plan.name.clone();
58        let cluster_id = plan.continual_task.cluster_id;
59
60        // Put a placeholder in the catalog so the optimizer can find something
61        // for the sink_id.
62        let id_ts = self.get_catalog_write_ts().await;
63        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
64        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
65
66        let entry = CatalogEntry {
67            item: CatalogItem::Table(Table {
68                create_sql: None,
69                desc: VersionedRelationDesc::new(desc.clone()),
70                collections,
71                conn_id: None,
72                resolved_ids: resolved_ids.clone(),
73                custom_logical_compaction_window: None,
74                is_retained_metrics_object: false,
75                data_source: TableDataSource::TableWrites {
76                    defaults: Vec::new(),
77                },
78            }),
79            referenced_by: Vec::new(),
80            used_by: Vec::new(),
81            id: item_id,
82            oid: 0,
83            name: name.clone(),
84            owner_id: *ctx.session().current_role_id(),
85            privileges: PrivilegeMap::new(),
86        };
87        let bootstrap_catalog = ContinualTaskCatalogBootstrap {
88            delegate: self.owned_catalog().as_optimizer_catalog(),
89            sink_id: global_id,
90            entry: CatalogCollectionEntry {
91                entry,
92                version: RelationVersionSelector::Latest,
93            },
94        };
95
96        // Construct the CatalogItem for this CT and optimize it.
97        let mut item = crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids)?;
98        let full_name = bootstrap_catalog.resolve_full_name(&name, Some(ctx.session().conn_id()));
99        let (optimized_plan, mut physical_plan, metainfo, optimizer_features) = self
100            .optimize_create_continual_task(
101                &item,
102                global_id,
103                Arc::new(bootstrap_catalog),
104                full_name.to_string(),
105            )?;
106
107        // Timestamp selection
108        let mut id_bundle = dataflow_import_id_bundle(&physical_plan, cluster_id.clone());
109        // Can't acquire a read hold on ourselves because we don't exist yet.
110        //
111        // It is not necessary to take a read hold on the CT output in the
112        // coordinator, since the current scheme takes read holds in the
113        // coordinator only to ensure inputs don't get compacted until the
114        // compute controller has installed its own read holds, which happens
115        // below with the `ship_dataflow` call.
116        id_bundle.storage_ids.remove(&global_id);
117        let read_holds = self.acquire_read_holds(&id_bundle);
118        let as_of = read_holds.least_valid_read();
119        physical_plan.set_as_of(as_of.clone());
120        // Used in dataflow rendering to avoid the snapshot for CTs that are
121        // restarted after they have committed the snapshot output.
122        physical_plan.set_initial_as_of(as_of.clone());
123
124        // Rewrite `create_sql` to reference self with the fully qualified name.
125        // This is normally done when `create_sql` is created at plan time, but
126        // we didn't have the necessary info in name resolution.
127        item.create_sql = update_create_sql(&item.create_sql, &full_name, as_of.as_option());
128
129        let ops = vec![catalog::Op::CreateItem {
130            id: item_id,
131            name: name.clone(),
132            item: CatalogItem::ContinualTask(item),
133            owner_id: *ctx.session().current_role_id(),
134        }];
135
136        let () = self
137            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
138                Box::pin(async move {
139                    let catalog = coord.catalog_mut();
140                    catalog.set_optimized_plan(global_id, optimized_plan);
141                    catalog.set_physical_plan(global_id, physical_plan.clone());
142                    catalog.set_dataflow_metainfo(global_id, metainfo);
143                    catalog.cache_expressions(global_id, None, optimizer_features);
144
145                    coord
146                        .controller
147                        .storage
148                        .create_collections(
149                            coord.catalog.state().storage_metadata(),
150                            None,
151                            vec![(
152                                global_id,
153                                CollectionDescription::for_other(desc, Some(as_of)),
154                            )],
155                        )
156                        .await
157                        .unwrap_or_terminate("cannot fail to append");
158
159                    coord.ship_dataflow(physical_plan, cluster_id, None).await;
160                    coord.allow_writes(cluster_id, global_id);
161                })
162            })
163            .await?;
164        Ok(ExecuteResponse::CreatedContinualTask)
165    }
166
167    pub fn optimize_create_continual_task(
168        &self,
169        ct: &ContinualTask,
170        output_id: GlobalId,
171        catalog: Arc<dyn OptimizerCatalog>,
172        debug_name: String,
173    ) -> Result<
174        (
175            DataflowDescription<OptimizedMirRelationExpr>,
176            DataflowDescription<Plan>,
177            DataflowMetainfo<Arc<OptimizerNotice>>,
178            OptimizerFeatures,
179        ),
180        AdapterError,
181    > {
182        let catalog = Arc::new(NoIndexCatalog { delegate: catalog });
183
184        let (_, view_id) = self.allocate_transient_id();
185        let compute_instance = self
186            .instance_snapshot(ct.cluster_id)
187            .expect("compute instance does not exist");
188        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
189            .override_from(&self.catalog.get_cluster(ct.cluster_id).config.features());
190        let optimizer_features = optimizer_config.features.clone();
191        let non_null_assertions = Vec::new();
192        let refresh_schedule = None;
193        // Continual Tasks turn an "input" into diffs by inserting retractions,
194        // which removes any monotonicity properties the collection otherwise
195        // would have had.
196        let force_non_monotonic = [ct.input_id].into();
197        let mut optimizer = optimize::materialized_view::Optimizer::new(
198            catalog,
199            compute_instance,
200            output_id,
201            view_id,
202            ct.desc.iter_names().cloned().collect(),
203            non_null_assertions,
204            refresh_schedule,
205            debug_name,
206            optimizer_config,
207            self.optimizer_metrics(),
208            force_non_monotonic,
209        );
210
211        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
212        let local_mir_plan = optimizer.catch_unwind_optimize((*ct.raw_expr).clone())?;
213        let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
214        let optimized_plan = global_mir_plan.df_desc().clone();
215        // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
216        let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan)?;
217        let (mut physical_plan, metainfo) = global_lir_plan.unapply();
218
219        // The MV optimizer is hardcoded to output a PersistSinkConnection.
220        // Sniff it and swap for the ContinualTaskSink. If/when we split out a
221        // Continual Task optimizer, this won't be necessary, and in the
222        // meantime, it seems undesirable to burden the MV optimizer with a
223        // configuration for this.
224        for sink in physical_plan.sink_exports.values_mut() {
225            match &mut sink.connection {
226                ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
227                    storage_metadata,
228                    ..
229                }) => {
230                    sink.with_snapshot = ct.with_snapshot;
231                    sink.connection =
232                        ComputeSinkConnection::ContinualTask(ContinualTaskConnection {
233                            input_id: ct.input_id,
234                            storage_metadata: *storage_metadata,
235                        })
236                }
237                _ => unreachable!("MV should produce persist sink connection"),
238            }
239        }
240
241        // Create a metainfo with rendered notices, preallocating a transient
242        // GlobalId for each.
243        let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
244            .map(|(_item_id, global_id)| global_id)
245            .take(metainfo.optimizer_notices.len())
246            .collect();
247        let metainfo = self
248            .catalog()
249            .render_notices(metainfo, notice_ids, Some(output_id));
250
251        Ok((optimized_plan, physical_plan, metainfo, optimizer_features))
252    }
253}
254
255/// An implementation of [OptimizerCatalog] with a placeholder for the continual
256/// task to solve the self-referential CT bootstrapping problem.
257#[derive(Debug)]
258struct ContinualTaskCatalogBootstrap {
259    delegate: Arc<dyn OptimizerCatalog>,
260    sink_id: GlobalId,
261    entry: CatalogCollectionEntry,
262}
263
264impl OptimizerCatalog for ContinualTaskCatalogBootstrap {
265    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
266        if self.sink_id == *id {
267            return self.entry.clone();
268        }
269        self.delegate.get_entry(id)
270    }
271
272    fn get_entry_by_item_id(&self, id: &mz_repr::CatalogItemId) -> &CatalogEntry {
273        self.delegate.get_entry_by_item_id(id)
274    }
275
276    fn resolve_full_name(
277        &self,
278        name: &mz_sql::names::QualifiedItemName,
279        conn_id: Option<&mz_adapter_types::connection::ConnectionId>,
280    ) -> mz_sql::names::FullItemName {
281        self.delegate.resolve_full_name(name, conn_id)
282    }
283
284    fn get_indexes_on(
285        &self,
286        id: GlobalId,
287        cluster: mz_controller_types::ClusterId,
288    ) -> Box<dyn Iterator<Item = (GlobalId, &mz_catalog::memory::objects::Index)> + '_> {
289        self.delegate.get_indexes_on(id, cluster)
290    }
291}
292
293fn update_create_sql(
294    create_sql: &str,
295    ct_name: &FullItemName,
296    as_of: Option<&Timestamp>,
297) -> String {
298    struct ReplaceName(PartialItemName);
299    impl<'ast> VisitMut<'ast, Raw> for ReplaceName {
300        fn visit_item_name_mut(&mut self, node: &'ast mut RawItemName) {
301            let Ok(name) = unresolved_item_name(node.name().clone()) else {
302                return;
303            };
304            if name.matches(&self.0) {
305                *(node.name_mut()) = self.0.clone().into();
306            }
307        }
308    }
309
310    let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
311        .expect("non-system items must be parseable")
312        .into_element()
313        .ast;
314    match &mut ast {
315        Statement::CreateContinualTask(stmt) => {
316            // Replace any self-references in the statements with the full name,
317            // now that we have it.
318            stmt.visit_mut(&mut ReplaceName(PartialItemName::from(ct_name.clone())));
319            // Also fill in the initial as_of.
320            if let Some(as_of) = as_of {
321                stmt.as_of = Some(as_of.into());
322            }
323        }
324        _ => unreachable!("should be CREATE CONTINUAL TASK statement"),
325    }
326    ast.to_ast_string_stable()
327}
328
329/// An [OptimizerCatalog] impl that ignores any indexes that exist.
330///
331/// TODO(ct3): At the moment, the dataflow rendering for CTs only knows how to
332/// turn persist_sources into CT inputs. If the optimizer decides to use an
333/// existing index in the cluster, it won't work. It seems tricky/invasive to
334/// fix the render bug, so for now pretend indexes don't exist for CTs. Remove
335/// this once we fix the bug.
336#[derive(Debug)]
337struct NoIndexCatalog {
338    delegate: Arc<dyn OptimizerCatalog>,
339}
340
341impl OptimizerCatalog for NoIndexCatalog {
342    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
343        self.delegate.get_entry(id)
344    }
345
346    fn get_entry_by_item_id(&self, id: &mz_repr::CatalogItemId) -> &CatalogEntry {
347        self.delegate.get_entry_by_item_id(id)
348    }
349
350    fn resolve_full_name(
351        &self,
352        name: &mz_sql::names::QualifiedItemName,
353        conn_id: Option<&mz_adapter_types::connection::ConnectionId>,
354    ) -> mz_sql::names::FullItemName {
355        self.delegate.resolve_full_name(name, conn_id)
356    }
357
358    fn get_indexes_on(
359        &self,
360        _id: GlobalId,
361        _cluster: mz_controller_types::ClusterId,
362    ) -> Box<dyn Iterator<Item = (GlobalId, &mz_catalog::memory::objects::Index)> + '_> {
363        Box::new(std::iter::empty())
364    }
365}