use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::Arc;
use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
use mz_expr::{
AccessStrategy, AggregateExpr, AggregateFunc, Id, MirRelationExpr, OptimizedMirRelationExpr,
RowSetFinishing,
};
use mz_ore::num::NonNeg;
use mz_repr::explain::ExprHumanizer;
use mz_repr::{GlobalId, Timestamp};
use mz_sql::ast::Statement;
use mz_sql::names::Aug;
use mz_sql::optimizer_metrics::OptimizerMetrics;
use mz_sql::plan::HirRelationExpr;
use mz_sql::session::metadata::SessionMetadata;
use mz_transform::EmptyStatisticsOracle;
use serde::Serialize;
use crate::catalog::Catalog;
use crate::coord::peek::{FastPathPlan, PeekPlan};
use crate::optimize::dataflows::ComputeInstanceSnapshot;
use crate::optimize::{self, Optimize, OptimizerConfig, OptimizerError};
use crate::session::SessionMeta;
use crate::TimestampContext;
#[derive(Debug)]
pub struct PlanInsightsContext {
pub stmt: Option<Statement<Aug>>,
pub raw_expr: HirRelationExpr,
pub catalog: Arc<Catalog>,
pub compute_instances: BTreeMap<String, ComputeInstanceSnapshot>,
pub target_instance: String,
pub metrics: OptimizerMetrics,
pub finishing: RowSetFinishing,
pub optimizer_config: OptimizerConfig,
pub session: SessionMeta,
pub timestamp_context: TimestampContext<Timestamp>,
pub view_id: GlobalId,
pub index_id: GlobalId,
pub enable_re_optimize: bool,
}
#[derive(Clone, Debug, Default, Serialize)]
pub struct PlanInsights {
pub imports: BTreeMap<String, ImportInsights>,
pub fast_path_clusters: BTreeMap<String, Option<FastPathCluster>>,
pub fast_path_limit: Option<usize>,
pub persist_count: Vec<Name>,
}
#[derive(Clone, Debug, Serialize)]
pub struct FastPathCluster {
index: Name,
on: Name,
}
impl PlanInsights {
pub async fn compute_fast_path_clusters(
&mut self,
humanizer: &dyn ExprHumanizer,
ctx: Box<PlanInsightsContext>,
) {
let session: Arc<dyn SessionMetadata + Send> = Arc::new(ctx.session);
let tasks = ctx
.compute_instances
.into_iter()
.map(|(name, compute_instance)| {
let raw_expr = ctx.raw_expr.clone();
let mut finishing = ctx.finishing.clone();
if name == ctx.target_instance {
finishing.limit = Some(NonNeg::try_from(1).expect("non-negitave"));
}
let session = Arc::clone(&session);
let timestamp_context = ctx.timestamp_context.clone();
let mut optimizer = optimize::peek::Optimizer::new(
Arc::clone(&ctx.catalog),
compute_instance,
finishing,
ctx.view_id,
ctx.index_id,
ctx.optimizer_config.clone(),
ctx.metrics.clone(),
);
mz_ore::task::spawn_blocking(
|| "compute fast path clusters",
move || {
let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?;
let local_mir_plan = local_mir_plan.resolve(
timestamp_context,
&*session,
Box::new(EmptyStatisticsOracle {}),
);
let global_lir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
Ok::<_, OptimizerError>((name, global_lir_plan))
},
)
});
for task in tasks {
let res = task.await;
let Ok(Ok((name, plan))) = res else {
continue;
};
let (plan, _, _) = plan.unapply();
if let PeekPlan::FastPath(plan) = plan {
if name == ctx.target_instance {
self.fast_path_limit =
Some(ctx.optimizer_config.features.persist_fast_path_limit);
continue;
}
let idx_name = if let FastPathPlan::PeekExisting(_, idx_id, _, _) = plan {
let idx_entry = ctx.catalog.get_entry_by_global_id(&idx_id);
Some(FastPathCluster {
index: structured_name(humanizer, idx_id),
on: structured_name(
humanizer,
idx_entry.index().expect("must be index").on,
),
})
} else {
None
};
self.fast_path_clusters.insert(name, idx_name);
}
}
}
}
#[derive(Clone, Debug, Serialize)]
pub struct ImportInsights {
pub name: Name,
#[serde(rename = "type")]
pub ty: ImportType,
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum ImportType {
Compute,
Storage,
}
#[derive(Debug, Clone, Serialize)]
pub struct Name {
#[serde(skip_serializing_if = "Option::is_none")]
pub database: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub schema: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub item: Option<String>,
}
pub fn plan_insights(
humanizer: &dyn ExprHumanizer,
global_plan: Option<DataflowDescription<OptimizedMirRelationExpr>>,
fast_path_plan: Option<FastPathPlan>,
) -> Option<PlanInsights> {
match (global_plan, fast_path_plan) {
(None, None) => None,
(None | Some(_), Some(fast_path_plan)) => {
Some(fast_path_insights(humanizer, fast_path_plan))
}
(Some(global_plan), None) => Some(global_insights(humanizer, global_plan)),
}
}
fn fast_path_insights(humanizer: &dyn ExprHumanizer, plan: FastPathPlan) -> PlanInsights {
let mut insights = PlanInsights::default();
match plan {
FastPathPlan::Constant { .. } => (),
FastPathPlan::PeekExisting(_, id, _, _) => {
add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
}
FastPathPlan::PeekPersist(id, _) => {
add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
}
}
insights
}
fn global_insights(
humanizer: &dyn ExprHumanizer,
plan: DataflowDescription<OptimizedMirRelationExpr>,
) -> PlanInsights {
let mut insights = PlanInsights::default();
for (id, _) in plan.source_imports {
add_import_insights(&mut insights, humanizer, id, ImportType::Storage)
}
for (id, _) in plan.index_imports {
add_import_insights(&mut insights, humanizer, id, ImportType::Compute)
}
for BuildDesc { plan, .. } in plan.objects_to_build {
plan.visit_pre(|expr| {
let MirRelationExpr::Reduce {
input,
group_key,
aggregates,
..
} = expr
else {
return;
};
if !group_key.is_empty() {
return;
}
let MirRelationExpr::Project { input, outputs } = &**input else {
return;
};
if !outputs.is_empty() {
return;
}
let MirRelationExpr::Get {
id: Id::Global(id),
access_strategy: AccessStrategy::Persist,
..
} = &**input
else {
return;
};
let [aggregate] = aggregates.as_slice() else {
return;
};
let AggregateExpr {
func: AggregateFunc::Count,
distinct: false,
expr,
} = aggregate
else {
return;
};
if !expr.is_literal_true() {
return;
}
let name = structured_name(humanizer, *id);
insights.persist_count.push(name);
});
}
insights
}
fn add_import_insights(
insights: &mut PlanInsights,
humanizer: &dyn ExprHumanizer,
id: GlobalId,
ty: ImportType,
) {
insights.imports.insert(
id.to_string(),
ImportInsights {
name: structured_name(humanizer, id),
ty,
},
);
}
fn structured_name(humanizer: &dyn ExprHumanizer, id: GlobalId) -> Name {
let mut parts = humanizer.humanize_id_parts(id).unwrap_or(Vec::new());
Name {
item: parts.pop(),
schema: parts.pop(),
database: parts.pop(),
}
}