1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Common methods for `CONTINUAL TASK`s.

use std::sync::Arc;

use mz_catalog::memory::objects::ContinualTask;
use mz_expr::visit::Visit;
use mz_expr::Id;
use mz_repr::GlobalId;
use mz_sql::names::ResolvedIds;
use mz_sql::plan::{self, HirRelationExpr};
use timely::progress::Antichain;

use crate::AdapterError;

pub fn ct_item_from_plan(
    plan: plan::CreateContinualTaskPlan,
    global_id: GlobalId,
    resolved_ids: ResolvedIds,
) -> Result<ContinualTask, AdapterError> {
    let plan::CreateContinualTaskPlan {
        name: _,
        placeholder_id,
        desc,
        input_id,
        with_snapshot,
        continual_task:
            plan::MaterializedView {
                create_sql,
                cluster_id,
                expr: mut raw_expr,
                dependencies,
                column_names: _,
                non_null_assertions: _,
                compaction_window: _,
                refresh_schedule: _,
                as_of,
            },
    } = plan;

    // Replace any placeholder LocalIds.
    if let Some(placeholder_id) = placeholder_id {
        raw_expr.visit_mut_post(&mut |expr| match expr {
            HirRelationExpr::Get { id, .. } if *id == Id::Local(placeholder_id) => {
                *id = Id::Global(global_id);
            }
            _ => {}
        })?;
    }
    // TODO(alter_table): `dependencies` doesn't include the `CatalogItemId` for self and we can't
    // look it up in the Catalog from it's `GlobalId` because we haven't yet added this item.

    Ok(ContinualTask {
        create_sql,
        global_id,
        input_id,
        with_snapshot,
        raw_expr: Arc::new(raw_expr),
        desc,
        resolved_ids,
        dependencies,
        cluster_id,
        initial_as_of: as_of.map(Antichain::from_elem),
    })
}