1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Common methods for `CONTINUAL TASK`s.
1112use std::sync::Arc;
1314use mz_catalog::memory::objects::ContinualTask;
15use mz_expr::Id;
16use mz_expr::visit::Visit;
17use mz_repr::GlobalId;
18use mz_sql::names::ResolvedIds;
19use mz_sql::plan::{self, HirRelationExpr};
20use timely::progress::Antichain;
2122use crate::AdapterError;
2324pub fn ct_item_from_plan(
25 plan: plan::CreateContinualTaskPlan,
26 global_id: GlobalId,
27 resolved_ids: ResolvedIds,
28) -> Result<ContinualTask, AdapterError> {
29let plan::CreateContinualTaskPlan {
30 name: _,
31 placeholder_id,
32 desc,
33 input_id,
34 with_snapshot,
35 continual_task:
36 plan::MaterializedView {
37 create_sql,
38 cluster_id,
39 expr: mut raw_expr,
40 dependencies,
41 column_names: _,
42 non_null_assertions: _,
43 compaction_window: _,
44 refresh_schedule: _,
45 as_of,
46 },
47 } = plan;
4849// Replace any placeholder LocalIds.
50if let Some(placeholder_id) = placeholder_id {
51 raw_expr.visit_mut_post(&mut |expr| match expr {
52 HirRelationExpr::Get { id, .. } if *id == Id::Local(placeholder_id) => {
53*id = Id::Global(global_id);
54 }
55_ => {}
56 })?;
57 }
58// TODO(alter_table): `dependencies` doesn't include the `CatalogItemId` for self and we can't
59 // look it up in the Catalog from it's `GlobalId` because we haven't yet added this item.
6061Ok(ContinualTask {
62 create_sql,
63 global_id,
64 input_id,
65 with_snapshot,
66 raw_expr: Arc::new(raw_expr),
67 desc,
68 resolved_ids,
69 dependencies,
70 cluster_id,
71 initial_as_of: as_of.map(Antichain::from_elem),
72 })
73}