mz_adapter/continual_task.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Common methods for `CONTINUAL TASK`s.
use std::sync::Arc;
use mz_catalog::memory::objects::ContinualTask;
use mz_expr::visit::Visit;
use mz_expr::Id;
use mz_repr::GlobalId;
use mz_sql::names::ResolvedIds;
use mz_sql::plan::{self, HirRelationExpr};
use timely::progress::Antichain;
use crate::AdapterError;
pub fn ct_item_from_plan(
plan: plan::CreateContinualTaskPlan,
global_id: GlobalId,
resolved_ids: ResolvedIds,
) -> Result<ContinualTask, AdapterError> {
let plan::CreateContinualTaskPlan {
name: _,
placeholder_id,
desc,
input_id,
with_snapshot,
continual_task:
plan::MaterializedView {
create_sql,
cluster_id,
expr: mut raw_expr,
dependencies,
column_names: _,
non_null_assertions: _,
compaction_window: _,
refresh_schedule: _,
as_of,
},
} = plan;
// Replace any placeholder LocalIds.
if let Some(placeholder_id) = placeholder_id {
raw_expr.visit_mut_post(&mut |expr| match expr {
HirRelationExpr::Get { id, .. } if *id == Id::Local(placeholder_id) => {
*id = Id::Global(global_id);
}
_ => {}
})?;
}
// TODO(alter_table): `dependencies` doesn't include the `CatalogItemId` for self and we can't
// look it up in the Catalog from it's `GlobalId` because we haven't yet added this item.
Ok(ContinualTask {
create_sql,
global_id,
input_id,
with_snapshot,
raw_expr: Arc::new(raw_expr),
desc,
resolved_ids,
dependencies,
cluster_id,
initial_as_of: as_of.map(Antichain::from_elem),
})
}