mz_adapter/coord/sequencer/inner/
create_continual_task.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use mz_catalog::memory::objects::{
13    CatalogCollectionEntry, CatalogEntry, CatalogItem, ContinualTask, Table, TableDataSource,
14};
15use mz_compute_types::dataflows::DataflowDescription;
16use mz_compute_types::plan::Plan;
17use mz_compute_types::sinks::{
18    ComputeSinkConnection, ContinualTaskConnection, MaterializedViewSinkConnection,
19};
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::collections::CollectionExt;
22use mz_ore::instrument;
23use mz_repr::adt::mz_acl_item::PrivilegeMap;
24use mz_repr::optimize::OverrideFrom;
25use mz_repr::{
26    GlobalId, RelationVersion, RelationVersionSelector, Timestamp, VersionedRelationDesc,
27};
28use mz_sql::ast::visit_mut::{VisitMut, VisitMutNode};
29use mz_sql::ast::{Raw, RawItemName};
30use mz_sql::names::{FullItemName, PartialItemName, ResolvedIds};
31use mz_sql::normalize::unresolved_item_name;
32use mz_sql::plan;
33use mz_sql::session::metadata::SessionMetadata;
34use mz_sql_parser::ast::Statement;
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_storage_client::controller::{CollectionDescription, DataSource};
37use mz_transform::dataflow::DataflowMetainfo;
38use mz_transform::notice::OptimizerNotice;
39
40use crate::command::ExecuteResponse;
41use crate::coord::Coordinator;
42use crate::error::AdapterError;
43use crate::optimize::dataflows::dataflow_import_id_bundle;
44use crate::optimize::{self, Optimize, OptimizerCatalog};
45use crate::util::ResultExt;
46use crate::{ExecuteContext, catalog};
47
48impl Coordinator {
49    #[instrument]
50    pub(crate) async fn sequence_create_continual_task(
51        &mut self,
52        ctx: &mut ExecuteContext,
53        plan: plan::CreateContinualTaskPlan,
54        resolved_ids: ResolvedIds,
55    ) -> Result<ExecuteResponse, AdapterError> {
56        let desc = plan.desc.clone();
57        let name = plan.name.clone();
58        let cluster_id = plan.continual_task.cluster_id;
59
60        // Put a placeholder in the catalog so the optimizer can find something
61        // for the sink_id.
62        let id_ts = self.get_catalog_write_ts().await;
63        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
64        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
65
66        let entry = CatalogEntry {
67            item: CatalogItem::Table(Table {
68                create_sql: None,
69                desc: VersionedRelationDesc::new(desc.clone()),
70                collections,
71                conn_id: None,
72                resolved_ids: resolved_ids.clone(),
73                custom_logical_compaction_window: None,
74                is_retained_metrics_object: false,
75                data_source: TableDataSource::TableWrites {
76                    defaults: Vec::new(),
77                },
78            }),
79            referenced_by: Vec::new(),
80            used_by: Vec::new(),
81            id: item_id,
82            oid: 0,
83            name: name.clone(),
84            owner_id: *ctx.session().current_role_id(),
85            privileges: PrivilegeMap::new(),
86        };
87        let bootstrap_catalog = ContinualTaskCatalogBootstrap {
88            delegate: self.owned_catalog().as_optimizer_catalog(),
89            sink_id: global_id,
90            entry: CatalogCollectionEntry {
91                entry,
92                version: RelationVersionSelector::Latest,
93            },
94        };
95
96        // Construct the CatalogItem for this CT and optimize it.
97        let mut item = crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids)?;
98        let full_name = bootstrap_catalog.resolve_full_name(&name, Some(ctx.session().conn_id()));
99        let (optimized_plan, mut physical_plan, metainfo) = self.optimize_create_continual_task(
100            &item,
101            global_id,
102            Arc::new(bootstrap_catalog),
103            full_name.to_string(),
104        )?;
105
106        // Timestamp selection
107        let mut id_bundle = dataflow_import_id_bundle(&physical_plan, cluster_id.clone());
108        // Can't acquire a read hold on ourselves because we don't exist yet.
109        //
110        // It is not necessary to take a read hold on the CT output in the
111        // coordinator, since the current scheme takes read holds in the
112        // coordinator only to ensure inputs don't get compacted until the
113        // compute controller has installed its own read holds, which happens
114        // below with the `ship_dataflow` call.
115        id_bundle.storage_ids.remove(&global_id);
116        let read_holds = self.acquire_read_holds(&id_bundle);
117        let as_of = read_holds.least_valid_read();
118        physical_plan.set_as_of(as_of.clone());
119        // Used in dataflow rendering to avoid the snapshot for CTs that are
120        // restarted after they have committed the snapshot output.
121        physical_plan.set_initial_as_of(as_of.clone());
122
123        // Rewrite `create_sql` to reference self with the fully qualified name.
124        // This is normally done when `create_sql` is created at plan time, but
125        // we didn't have the necessary info in name resolution.
126        item.create_sql = update_create_sql(&item.create_sql, &full_name, as_of.as_option());
127
128        let ops = vec![catalog::Op::CreateItem {
129            id: item_id,
130            name: name.clone(),
131            item: CatalogItem::ContinualTask(item),
132            owner_id: *ctx.session().current_role_id(),
133        }];
134
135        let () = self
136            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
137                Box::pin(async move {
138                    let catalog = coord.catalog_mut();
139                    catalog.set_optimized_plan(global_id, optimized_plan);
140                    catalog.set_physical_plan(global_id, physical_plan.clone());
141                    catalog.set_dataflow_metainfo(global_id, metainfo);
142
143                    coord
144                        .controller
145                        .storage
146                        .create_collections(
147                            coord.catalog.state().storage_metadata(),
148                            None,
149                            vec![(
150                                global_id,
151                                CollectionDescription {
152                                    desc,
153                                    data_source: DataSource::Other,
154                                    since: Some(as_of),
155                                    status_collection_id: None,
156                                    timeline: None,
157                                },
158                            )],
159                        )
160                        .await
161                        .unwrap_or_terminate("cannot fail to append");
162
163                    coord.ship_dataflow(physical_plan, cluster_id, None).await;
164                    coord.allow_writes(cluster_id, global_id);
165                })
166            })
167            .await?;
168        Ok(ExecuteResponse::CreatedContinualTask)
169    }
170
171    pub fn optimize_create_continual_task(
172        &self,
173        ct: &ContinualTask,
174        output_id: GlobalId,
175        catalog: Arc<dyn OptimizerCatalog>,
176        debug_name: String,
177    ) -> Result<
178        (
179            DataflowDescription<OptimizedMirRelationExpr>,
180            DataflowDescription<Plan>,
181            DataflowMetainfo<Arc<OptimizerNotice>>,
182        ),
183        AdapterError,
184    > {
185        let catalog = Arc::new(NoIndexCatalog { delegate: catalog });
186
187        let (_, view_id) = self.allocate_transient_id();
188        let compute_instance = self
189            .instance_snapshot(ct.cluster_id)
190            .expect("compute instance does not exist");
191        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
192            .override_from(&self.catalog.get_cluster(ct.cluster_id).config.features());
193        let non_null_assertions = Vec::new();
194        let refresh_schedule = None;
195        // Continual Tasks turn an "input" into diffs by inserting retractions,
196        // which removes any monotonicity properties the collection otherwise
197        // would have had.
198        let force_non_monotonic = [ct.input_id].into();
199        let mut optimizer = optimize::materialized_view::Optimizer::new(
200            catalog,
201            compute_instance,
202            output_id,
203            view_id,
204            ct.desc.iter_names().cloned().collect(),
205            non_null_assertions,
206            refresh_schedule,
207            debug_name,
208            optimizer_config,
209            self.optimizer_metrics(),
210            force_non_monotonic,
211        );
212
213        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
214        let local_mir_plan = optimizer.catch_unwind_optimize((*ct.raw_expr).clone())?;
215        let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
216        let optimized_plan = global_mir_plan.df_desc().clone();
217        // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
218        let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan)?;
219        let (mut physical_plan, metainfo) = global_lir_plan.unapply();
220
221        // The MV optimizer is hardcoded to output a PersistSinkConnection.
222        // Sniff it and swap for the ContinualTaskSink. If/when we split out a
223        // Continual Task optimizer, this won't be necessary, and in the
224        // meantime, it seems undesirable to burden the MV optimizer with a
225        // configuration for this.
226        for sink in physical_plan.sink_exports.values_mut() {
227            match &mut sink.connection {
228                ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
229                    storage_metadata,
230                    ..
231                }) => {
232                    sink.with_snapshot = ct.with_snapshot;
233                    sink.connection =
234                        ComputeSinkConnection::ContinualTask(ContinualTaskConnection {
235                            input_id: ct.input_id,
236                            storage_metadata: *storage_metadata,
237                        })
238                }
239                _ => unreachable!("MV should produce persist sink connection"),
240            }
241        }
242
243        // Create a metainfo with rendered notices, preallocating a transient
244        // GlobalId for each.
245        let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
246            .map(|(_item_id, global_id)| global_id)
247            .take(metainfo.optimizer_notices.len())
248            .collect();
249        let metainfo = self
250            .catalog()
251            .render_notices(metainfo, notice_ids, Some(output_id));
252
253        Ok((optimized_plan, physical_plan, metainfo))
254    }
255}
256
257/// An implementation of [OptimizerCatalog] with a placeholder for the continual
258/// task to solve the self-referential CT bootstrapping problem.
259#[derive(Debug)]
260struct ContinualTaskCatalogBootstrap {
261    delegate: Arc<dyn OptimizerCatalog>,
262    sink_id: GlobalId,
263    entry: CatalogCollectionEntry,
264}
265
266impl OptimizerCatalog for ContinualTaskCatalogBootstrap {
267    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
268        if self.sink_id == *id {
269            return self.entry.clone();
270        }
271        self.delegate.get_entry(id)
272    }
273
274    fn get_entry_by_item_id(&self, id: &mz_repr::CatalogItemId) -> &CatalogEntry {
275        self.delegate.get_entry_by_item_id(id)
276    }
277
278    fn resolve_full_name(
279        &self,
280        name: &mz_sql::names::QualifiedItemName,
281        conn_id: Option<&mz_adapter_types::connection::ConnectionId>,
282    ) -> mz_sql::names::FullItemName {
283        self.delegate.resolve_full_name(name, conn_id)
284    }
285
286    fn get_indexes_on(
287        &self,
288        id: GlobalId,
289        cluster: mz_controller_types::ClusterId,
290    ) -> Box<dyn Iterator<Item = (GlobalId, &mz_catalog::memory::objects::Index)> + '_> {
291        self.delegate.get_indexes_on(id, cluster)
292    }
293}
294
295fn update_create_sql(
296    create_sql: &str,
297    ct_name: &FullItemName,
298    as_of: Option<&Timestamp>,
299) -> String {
300    struct ReplaceName(PartialItemName);
301    impl<'ast> VisitMut<'ast, Raw> for ReplaceName {
302        fn visit_item_name_mut(&mut self, node: &'ast mut RawItemName) {
303            let Ok(name) = unresolved_item_name(node.name().clone()) else {
304                return;
305            };
306            if name.matches(&self.0) {
307                *(node.name_mut()) = self.0.clone().into();
308            }
309        }
310    }
311
312    let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
313        .expect("non-system items must be parseable")
314        .into_element()
315        .ast;
316    match &mut ast {
317        Statement::CreateContinualTask(stmt) => {
318            // Replace any self-references in the statements with the full name,
319            // now that we have it.
320            stmt.visit_mut(&mut ReplaceName(PartialItemName::from(ct_name.clone())));
321            // Also fill in the initial as_of.
322            if let Some(as_of) = as_of {
323                stmt.as_of = Some(as_of.into());
324            }
325        }
326        _ => unreachable!("should be CREATE CONTINUAL TASK statement"),
327    }
328    ast.to_ast_string_stable()
329}
330
331/// An [OptimizerCatalog] impl that ignores any indexes that exist.
332///
333/// TODO(ct3): At the moment, the dataflow rendering for CTs only knows how to
334/// turn persist_sources into CT inputs. If the optimizer decides to use an
335/// existing index in the cluster, it won't work. It seems tricky/invasive to
336/// fix the render bug, so for now pretend indexes don't exist for CTs. Remove
337/// this once we fix the bug.
338#[derive(Debug)]
339struct NoIndexCatalog {
340    delegate: Arc<dyn OptimizerCatalog>,
341}
342
343impl OptimizerCatalog for NoIndexCatalog {
344    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
345        self.delegate.get_entry(id)
346    }
347
348    fn get_entry_by_item_id(&self, id: &mz_repr::CatalogItemId) -> &CatalogEntry {
349        self.delegate.get_entry_by_item_id(id)
350    }
351
352    fn resolve_full_name(
353        &self,
354        name: &mz_sql::names::QualifiedItemName,
355        conn_id: Option<&mz_adapter_types::connection::ConnectionId>,
356    ) -> mz_sql::names::FullItemName {
357        self.delegate.resolve_full_name(name, conn_id)
358    }
359
360    fn get_indexes_on(
361        &self,
362        _id: GlobalId,
363        _cluster: mz_controller_types::ClusterId,
364    ) -> Box<dyn Iterator<Item = (GlobalId, &mz_catalog::memory::objects::Index)> + '_> {
365        Box::new(std::iter::empty())
366    }
367}