Skip to main content

mz_adapter/
continual_task.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Common methods for `CONTINUAL TASK`s.
11
12use std::sync::Arc;
13
14use mz_catalog::memory::objects::ContinualTask;
15use mz_expr::Id;
16use mz_expr::visit::Visit;
17use mz_repr::GlobalId;
18use mz_sql::names::ResolvedIds;
19use mz_sql::plan::{self, HirRelationExpr};
20use timely::progress::Antichain;
21
22use crate::AdapterError;
23
24pub fn ct_item_from_plan(
25    plan: plan::CreateContinualTaskPlan,
26    global_id: GlobalId,
27    resolved_ids: ResolvedIds,
28) -> Result<ContinualTask, AdapterError> {
29    let plan::CreateContinualTaskPlan {
30        name: _,
31        placeholder_id,
32        desc,
33        input_id,
34        with_snapshot,
35        continual_task:
36            plan::MaterializedView {
37                create_sql,
38                cluster_id,
39                expr: mut raw_expr,
40                dependencies,
41                column_names: _,
42                replacement_target: _,
43                target_replica: _,
44                non_null_assertions: _,
45                compaction_window: _,
46                refresh_schedule: _,
47                as_of,
48            },
49    } = plan;
50
51    // Replace any placeholder LocalIds.
52    if let Some(placeholder_id) = placeholder_id {
53        raw_expr.visit_mut_post(&mut |expr| match expr {
54            HirRelationExpr::Get { id, .. } if *id == Id::Local(placeholder_id) => {
55                *id = Id::Global(global_id);
56            }
57            _ => {}
58        })?;
59    }
60    // TODO(alter_table): `dependencies` doesn't include the `CatalogItemId` for self and we can't
61    // look it up in the Catalog from it's `GlobalId` because we haven't yet added this item.
62
63    Ok(ContinualTask {
64        create_sql,
65        global_id,
66        input_id,
67        with_snapshot,
68        raw_expr: Arc::new(raw_expr),
69        desc,
70        resolved_ids,
71        dependencies,
72        cluster_id,
73        initial_as_of: as_of.map(Antichain::from_elem),
74    })
75}