use std::sync::Arc;
use std::time::{Duration, Instant};
use mz_compute_types::plan::Plan;
use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc, PersistSinkConnection};
use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr};
use mz_repr::explain::trace_plan;
use mz_repr::refresh_schedule::RefreshSchedule;
use mz_repr::{ColumnName, GlobalId, RelationDesc};
use mz_sql::optimizer_metrics::OptimizerMetrics;
use mz_sql::plan::HirRelationExpr;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::normalize_lets::normalize_lets;
use mz_transform::typecheck::{empty_context, SharedContext as TypecheckContext};
use mz_transform::TransformCtx;
use timely::progress::Antichain;
use crate::catalog::Catalog;
use crate::optimize::dataflows::{
prep_relation_expr, prep_scalar_expr, ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle,
};
use crate::optimize::{
optimize_mir_local, trace_plan, LirDataflowDescription, MirDataflowDescription, Optimize,
OptimizeMode, OptimizerConfig, OptimizerError,
};
pub struct Optimizer {
typecheck_ctx: TypecheckContext,
catalog: Arc<Catalog>,
compute_instance: ComputeInstanceSnapshot,
sink_id: GlobalId,
view_id: GlobalId,
column_names: Vec<ColumnName>,
non_null_assertions: Vec<usize>,
refresh_schedule: Option<RefreshSchedule>,
debug_name: String,
config: OptimizerConfig,
metrics: OptimizerMetrics,
duration: Duration,
}
impl Optimizer {
pub fn new(
catalog: Arc<Catalog>,
compute_instance: ComputeInstanceSnapshot,
sink_id: GlobalId,
view_id: GlobalId,
column_names: Vec<ColumnName>,
non_null_assertions: Vec<usize>,
refresh_schedule: Option<RefreshSchedule>,
debug_name: String,
config: OptimizerConfig,
metrics: OptimizerMetrics,
) -> Self {
Self {
typecheck_ctx: empty_context(),
catalog,
compute_instance,
sink_id,
view_id,
column_names,
non_null_assertions,
refresh_schedule,
debug_name,
config,
metrics,
duration: Default::default(),
}
}
}
#[derive(Clone, Debug)]
pub struct LocalMirPlan {
expr: MirRelationExpr,
df_meta: DataflowMetainfo,
}
#[derive(Clone, Debug)]
pub struct GlobalMirPlan {
df_desc: MirDataflowDescription,
df_meta: DataflowMetainfo,
}
impl GlobalMirPlan {
pub fn df_desc(&self) -> &MirDataflowDescription {
&self.df_desc
}
}
#[derive(Clone, Debug)]
pub struct GlobalLirPlan {
df_desc: LirDataflowDescription,
df_meta: DataflowMetainfo,
}
impl GlobalLirPlan {
pub fn df_desc(&self) -> &LirDataflowDescription {
&self.df_desc
}
pub fn df_meta(&self) -> &DataflowMetainfo {
&self.df_meta
}
pub fn desc(&self) -> &RelationDesc {
let sink_exports = &self.df_desc.sink_exports;
let sink = sink_exports.values().next().expect("valid sink");
&sink.from_desc
}
}
impl Optimize<HirRelationExpr> for Optimizer {
type To = LocalMirPlan;
fn optimize(&mut self, expr: HirRelationExpr) -> Result<Self::To, OptimizerError> {
let time = Instant::now();
trace_plan!(at: "raw", &expr);
let expr = expr.lower(&self.config, Some(&self.metrics))?;
let mut df_meta = DataflowMetainfo::default();
let mut transform_ctx =
TransformCtx::local(&self.config.features, &self.typecheck_ctx, &mut df_meta);
let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();
self.duration += time.elapsed();
Ok(LocalMirPlan { expr, df_meta })
}
}
impl LocalMirPlan {
pub fn expr(&self) -> OptimizedMirRelationExpr {
OptimizedMirRelationExpr(self.expr.clone())
}
}
impl Optimize<OptimizedMirRelationExpr> for Optimizer {
type To = GlobalMirPlan;
fn optimize(&mut self, expr: OptimizedMirRelationExpr) -> Result<Self::To, OptimizerError> {
let expr = expr.into_inner();
let df_meta = DataflowMetainfo::default();
self.optimize(LocalMirPlan { expr, df_meta })
}
}
impl Optimize<LocalMirPlan> for Optimizer {
type To = GlobalMirPlan;
fn optimize(&mut self, plan: LocalMirPlan) -> Result<Self::To, OptimizerError> {
let time = Instant::now();
let expr = OptimizedMirRelationExpr(plan.expr);
let mut df_meta = plan.df_meta;
let mut rel_typ = expr.typ();
for &i in self.non_null_assertions.iter() {
rel_typ.column_types[i].nullable = false;
}
let rel_desc = RelationDesc::new(rel_typ, self.column_names.clone());
let mut df_builder = {
let catalog = self.catalog.state();
let compute = self.compute_instance.clone();
DataflowBuilder::new(catalog, compute).with_config(&self.config)
};
let mut df_desc = MirDataflowDescription::new(self.debug_name.clone());
df_desc.refresh_schedule.clone_from(&self.refresh_schedule);
df_builder.import_view_into_dataflow(
&self.view_id,
&expr,
&mut df_desc,
&self.config.features,
)?;
df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
let sink_description = ComputeSinkDesc {
from: self.view_id,
from_desc: rel_desc.clone(),
connection: ComputeSinkConnection::Persist(PersistSinkConnection {
value_desc: rel_desc,
storage_metadata: (),
}),
with_snapshot: true,
up_to: Antichain::default(),
non_null_assertions: self.non_null_assertions.clone(),
refresh_schedule: self.refresh_schedule.clone(),
};
df_desc.export_sink(self.sink_id, sink_description);
let style = ExprPrepStyle::Index;
df_desc.visit_children(
|r| prep_relation_expr(r, style),
|s| prep_scalar_expr(s, style),
)?;
let mut transform_ctx = TransformCtx::global(
&df_builder,
&mz_transform::EmptyStatisticsOracle, &self.config.features,
&self.typecheck_ctx,
&mut df_meta,
);
mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
if self.config.mode == OptimizeMode::Explain {
trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
}
self.duration += time.elapsed();
Ok(GlobalMirPlan { df_desc, df_meta })
}
}
impl Optimize<GlobalMirPlan> for Optimizer {
type To = GlobalLirPlan;
fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
let time = Instant::now();
let GlobalMirPlan {
mut df_desc,
df_meta,
} = plan;
for build in df_desc.objects_to_build.iter_mut() {
normalize_lets(&mut build.plan.0, &self.config.features)?
}
let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
trace_plan(&df_desc);
self.duration += time.elapsed();
self.metrics
.observe_e2e_optimization_time("materialized_view", self.duration);
Ok(GlobalLirPlan { df_desc, df_meta })
}
}
impl GlobalLirPlan {
pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
(self.df_desc, self.df_meta)
}
}