mz_adapter/coord/sequencer/inner/
create_continual_task.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use mz_catalog::memory::objects::{
13    CatalogCollectionEntry, CatalogEntry, CatalogItem, ContinualTask, Table, TableDataSource,
14};
15use mz_compute_types::dataflows::DataflowDescription;
16use mz_compute_types::plan::Plan;
17use mz_compute_types::sinks::{
18    ComputeSinkConnection, ContinualTaskConnection, MaterializedViewSinkConnection,
19};
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::collections::CollectionExt;
22use mz_ore::instrument;
23use mz_repr::adt::mz_acl_item::PrivilegeMap;
24use mz_repr::optimize::OverrideFrom;
25use mz_repr::{
26    GlobalId, RelationVersion, RelationVersionSelector, Timestamp, VersionedRelationDesc,
27};
28use mz_sql::ast::visit_mut::{VisitMut, VisitMutNode};
29use mz_sql::ast::{Raw, RawItemName};
30use mz_sql::names::{FullItemName, PartialItemName, ResolvedIds};
31use mz_sql::normalize::unresolved_item_name;
32use mz_sql::plan;
33use mz_sql::session::metadata::SessionMetadata;
34use mz_sql_parser::ast::Statement;
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_storage_client::controller::{CollectionDescription, DataSource};
37use mz_transform::dataflow::DataflowMetainfo;
38use mz_transform::notice::OptimizerNotice;
39
40use crate::command::ExecuteResponse;
41use crate::coord::Coordinator;
42use crate::error::AdapterError;
43use crate::optimize::dataflows::dataflow_import_id_bundle;
44use crate::optimize::{self, Optimize, OptimizerCatalog};
45use crate::util::ResultExt;
46use crate::{ExecuteContext, catalog};
47
48impl Coordinator {
49    #[instrument]
50    pub(crate) async fn sequence_create_continual_task(
51        &mut self,
52        ctx: &mut ExecuteContext,
53        plan: plan::CreateContinualTaskPlan,
54        resolved_ids: ResolvedIds,
55    ) -> Result<ExecuteResponse, AdapterError> {
56        let desc = plan.desc.clone();
57        let name = plan.name.clone();
58        let cluster_id = plan.continual_task.cluster_id;
59
60        // Put a placeholder in the catalog so the optimizer can find something
61        // for the sink_id.
62        let id_ts = self.get_catalog_write_ts().await;
63        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
64        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
65
66        let entry = CatalogEntry {
67            item: CatalogItem::Table(Table {
68                create_sql: None,
69                desc: VersionedRelationDesc::new(desc.clone()),
70                collections,
71                conn_id: None,
72                resolved_ids: resolved_ids.clone(),
73                custom_logical_compaction_window: None,
74                is_retained_metrics_object: false,
75                data_source: TableDataSource::TableWrites {
76                    defaults: Vec::new(),
77                },
78            }),
79            referenced_by: Vec::new(),
80            used_by: Vec::new(),
81            id: item_id,
82            oid: 0,
83            name: name.clone(),
84            owner_id: *ctx.session().current_role_id(),
85            privileges: PrivilegeMap::new(),
86        };
87        let bootstrap_catalog = ContinualTaskCatalogBootstrap {
88            delegate: self.owned_catalog().as_optimizer_catalog(),
89            sink_id: global_id,
90            entry: CatalogCollectionEntry {
91                entry,
92                version: RelationVersionSelector::Latest,
93            },
94        };
95
96        // Construct the CatalogItem for this CT and optimize it.
97        let mut item = crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids)?;
98        let full_name = bootstrap_catalog.resolve_full_name(&name, Some(ctx.session().conn_id()));
99        let (optimized_plan, mut physical_plan, metainfo) = self.optimize_create_continual_task(
100            &item,
101            global_id,
102            Arc::new(bootstrap_catalog),
103            full_name.to_string(),
104        )?;
105
106        // Timestamp selection
107        let mut id_bundle = dataflow_import_id_bundle(&physical_plan, cluster_id.clone());
108        // Can't acquire a read hold on ourselves because we don't exist yet.
109        //
110        // It is not necessary to take a read hold on the CT output in the
111        // coordinator, since the current scheme takes read holds in the
112        // coordinator only to ensure inputs don't get compacted until the
113        // compute controller has installed its own read holds, which happens
114        // below with the `ship_dataflow` call.
115        id_bundle.storage_ids.remove(&global_id);
116        let read_holds = self.acquire_read_holds(&id_bundle);
117        let as_of = read_holds.least_valid_read();
118        physical_plan.set_as_of(as_of.clone());
119        // Used in dataflow rendering to avoid the snapshot for CTs that are
120        // restarted after they have committed the snapshot output.
121        physical_plan.set_initial_as_of(as_of.clone());
122
123        // Rewrite `create_sql` to reference self with the fully qualified name.
124        // This is normally done when `create_sql` is created at plan time, but
125        // we didn't have the necessary info in name resolution.
126        item.create_sql = update_create_sql(&item.create_sql, &full_name, as_of.as_option());
127
128        let ops = vec![catalog::Op::CreateItem {
129            id: item_id,
130            name: name.clone(),
131            item: CatalogItem::ContinualTask(item),
132            owner_id: *ctx.session().current_role_id(),
133        }];
134
135        let () = self
136            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
137                Box::pin(async move {
138                    let catalog = coord.catalog_mut();
139                    catalog.set_optimized_plan(global_id, optimized_plan);
140                    catalog.set_physical_plan(global_id, physical_plan.clone());
141                    catalog.set_dataflow_metainfo(global_id, metainfo);
142
143                    coord
144                        .controller
145                        .storage
146                        .create_collections(
147                            coord.catalog.state().storage_metadata(),
148                            None,
149                            vec![(
150                                global_id,
151                                CollectionDescription {
152                                    desc,
153                                    data_source: DataSource::Other,
154                                    since: Some(as_of),
155                                    status_collection_id: None,
156                                    timeline: None,
157                                },
158                            )],
159                        )
160                        .await
161                        .unwrap_or_terminate("cannot fail to append");
162
163                    coord.ship_dataflow(physical_plan, cluster_id, None).await;
164                })
165            })
166            .await?;
167        Ok(ExecuteResponse::CreatedContinualTask)
168    }
169
170    pub fn optimize_create_continual_task(
171        &self,
172        ct: &ContinualTask,
173        output_id: GlobalId,
174        catalog: Arc<dyn OptimizerCatalog>,
175        debug_name: String,
176    ) -> Result<
177        (
178            DataflowDescription<OptimizedMirRelationExpr>,
179            DataflowDescription<Plan>,
180            DataflowMetainfo<Arc<OptimizerNotice>>,
181        ),
182        AdapterError,
183    > {
184        let catalog = Arc::new(NoIndexCatalog { delegate: catalog });
185
186        let (_, view_id) = self.allocate_transient_id();
187        let compute_instance = self
188            .instance_snapshot(ct.cluster_id)
189            .expect("compute instance does not exist");
190        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
191            .override_from(&self.catalog.get_cluster(ct.cluster_id).config.features());
192        let non_null_assertions = Vec::new();
193        let refresh_schedule = None;
194        // Continual Tasks turn an "input" into diffs by inserting retractions,
195        // which removes any monotonicity properties the collection otherwise
196        // would have had.
197        let force_non_monotonic = [ct.input_id].into();
198        let mut optimizer = optimize::materialized_view::Optimizer::new(
199            catalog,
200            compute_instance,
201            output_id,
202            view_id,
203            ct.desc.iter_names().cloned().collect(),
204            non_null_assertions,
205            refresh_schedule,
206            debug_name,
207            optimizer_config,
208            self.optimizer_metrics(),
209            force_non_monotonic,
210        );
211
212        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
213        let local_mir_plan = optimizer.catch_unwind_optimize((*ct.raw_expr).clone())?;
214        let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
215        let optimized_plan = global_mir_plan.df_desc().clone();
216        // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
217        let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan)?;
218        let (mut physical_plan, metainfo) = global_lir_plan.unapply();
219
220        // The MV optimizer is hardcoded to output a PersistSinkConnection.
221        // Sniff it and swap for the ContinualTaskSink. If/when we split out a
222        // Continual Task optimizer, this won't be necessary, and in the
223        // meantime, it seems undesirable to burden the MV optimizer with a
224        // configuration for this.
225        for sink in physical_plan.sink_exports.values_mut() {
226            match &mut sink.connection {
227                ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
228                    storage_metadata,
229                    ..
230                }) => {
231                    sink.with_snapshot = ct.with_snapshot;
232                    sink.connection =
233                        ComputeSinkConnection::ContinualTask(ContinualTaskConnection {
234                            input_id: ct.input_id,
235                            storage_metadata: *storage_metadata,
236                        })
237                }
238                _ => unreachable!("MV should produce persist sink connection"),
239            }
240        }
241
242        // Create a metainfo with rendered notices, preallocating a transient
243        // GlobalId for each.
244        let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
245            .map(|(_item_id, global_id)| global_id)
246            .take(metainfo.optimizer_notices.len())
247            .collect();
248        let metainfo = self
249            .catalog()
250            .render_notices(metainfo, notice_ids, Some(output_id));
251
252        Ok((optimized_plan, physical_plan, metainfo))
253    }
254}
255
256/// An implementation of [OptimizerCatalog] with a placeholder for the continual
257/// task to solve the self-referential CT bootstrapping problem.
258#[derive(Debug)]
259struct ContinualTaskCatalogBootstrap {
260    delegate: Arc<dyn OptimizerCatalog>,
261    sink_id: GlobalId,
262    entry: CatalogCollectionEntry,
263}
264
265impl OptimizerCatalog for ContinualTaskCatalogBootstrap {
266    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
267        if self.sink_id == *id {
268            return self.entry.clone();
269        }
270        self.delegate.get_entry(id)
271    }
272
273    fn get_entry_by_item_id(&self, id: &mz_repr::CatalogItemId) -> &CatalogEntry {
274        self.delegate.get_entry_by_item_id(id)
275    }
276
277    fn resolve_full_name(
278        &self,
279        name: &mz_sql::names::QualifiedItemName,
280        conn_id: Option<&mz_adapter_types::connection::ConnectionId>,
281    ) -> mz_sql::names::FullItemName {
282        self.delegate.resolve_full_name(name, conn_id)
283    }
284
285    fn get_indexes_on(
286        &self,
287        id: GlobalId,
288        cluster: mz_controller_types::ClusterId,
289    ) -> Box<dyn Iterator<Item = (GlobalId, &mz_catalog::memory::objects::Index)> + '_> {
290        self.delegate.get_indexes_on(id, cluster)
291    }
292}
293
294fn update_create_sql(
295    create_sql: &str,
296    ct_name: &FullItemName,
297    as_of: Option<&Timestamp>,
298) -> String {
299    struct ReplaceName(PartialItemName);
300    impl<'ast> VisitMut<'ast, Raw> for ReplaceName {
301        fn visit_item_name_mut(&mut self, node: &'ast mut RawItemName) {
302            let Ok(name) = unresolved_item_name(node.name().clone()) else {
303                return;
304            };
305            if name.matches(&self.0) {
306                *(node.name_mut()) = self.0.clone().into();
307            }
308        }
309    }
310
311    let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
312        .expect("non-system items must be parseable")
313        .into_element()
314        .ast;
315    match &mut ast {
316        Statement::CreateContinualTask(stmt) => {
317            // Replace any self-references in the statements with the full name,
318            // now that we have it.
319            stmt.visit_mut(&mut ReplaceName(PartialItemName::from(ct_name.clone())));
320            // Also fill in the initial as_of.
321            if let Some(as_of) = as_of {
322                stmt.as_of = Some(as_of.into());
323            }
324        }
325        _ => unreachable!("should be CREATE CONTINUAL TASK statement"),
326    }
327    ast.to_ast_string_stable()
328}
329
330/// An [OptimizerCatalog] impl that ignores any indexes that exist.
331///
332/// TODO(ct3): At the moment, the dataflow rendering for CTs only knows how to
333/// turn persist_sources into CT inputs. If the optimizer decides to use an
334/// existing index in the cluster, it won't work. It seems tricky/invasive to
335/// fix the render bug, so for now pretend indexes don't exist for CTs. Remove
336/// this once we fix the bug.
337#[derive(Debug)]
338struct NoIndexCatalog {
339    delegate: Arc<dyn OptimizerCatalog>,
340}
341
342impl OptimizerCatalog for NoIndexCatalog {
343    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
344        self.delegate.get_entry(id)
345    }
346
347    fn get_entry_by_item_id(&self, id: &mz_repr::CatalogItemId) -> &CatalogEntry {
348        self.delegate.get_entry_by_item_id(id)
349    }
350
351    fn resolve_full_name(
352        &self,
353        name: &mz_sql::names::QualifiedItemName,
354        conn_id: Option<&mz_adapter_types::connection::ConnectionId>,
355    ) -> mz_sql::names::FullItemName {
356        self.delegate.resolve_full_name(name, conn_id)
357    }
358
359    fn get_indexes_on(
360        &self,
361        _id: GlobalId,
362        _cluster: mz_controller_types::ClusterId,
363    ) -> Box<dyn Iterator<Item = (GlobalId, &mz_catalog::memory::objects::Index)> + '_> {
364        Box::new(std::iter::empty())
365    }
366}