mz_sql/
optimizer_metrics.rs1use std::cell::RefCell;
13use std::time::Duration;
14
15use mz_ore::metric;
16use mz_ore::metrics::MetricsRegistry;
17use mz_ore::stats::histogram_seconds_buckets;
18use prometheus::{HistogramVec, IntCounterVec};
19
20#[derive(Debug, Clone)]
22pub struct OptimizerMetrics {
23 e2e_optimization_time_seconds: HistogramVec,
24 e2e_optimization_time_seconds_log_threshold: Duration,
25 outer_join_lowering_cases: IntCounterVec,
26 transform_hits: IntCounterVec,
27 transform_total: IntCounterVec,
28 transform_time_seconds: RefCell<std::collections::BTreeMap<String, Vec<Duration>>>,
31}
32
33impl OptimizerMetrics {
34 pub fn register_into(
35 registry: &MetricsRegistry,
36 e2e_optimization_time_seconds_log_threshold: Duration,
37 ) -> Self {
38 Self {
39 e2e_optimization_time_seconds: registry.register(metric!(
40 name: "mz_optimizer_e2e_optimization_time_seconds",
41 help: "A histogram of end-to-end optimization times since restart.",
42 var_labels: ["object_type"],
43 buckets: histogram_seconds_buckets(0.000_128, 8.0),
44 )),
45 e2e_optimization_time_seconds_log_threshold,
46 outer_join_lowering_cases: registry.register(metric!(
47 name: "outer_join_lowering_cases",
48 help: "How many times the different outer join lowering cases happened.",
49 var_labels: ["case"],
50 )),
51 transform_hits: registry.register(metric!(
52 name: "transform_hits",
53 help: "How many times a given transform changed the plan.",
54 var_labels: ["transform"],
55 )),
56 transform_total: registry.register(metric!(
57 name: "transform_total",
58 help: "How many times a given transform was applied.",
59 var_labels: ["transform"],
60 )),
61 transform_time_seconds: RefCell::new(std::collections::BTreeMap::new()),
62 }
63 }
64
65 pub fn observe_e2e_optimization_time(&self, object_type: &str, duration: Duration) {
66 self.e2e_optimization_time_seconds
67 .with_label_values(&[object_type])
68 .observe(duration.as_secs_f64());
69 let debug_threshold = cfg!(debug_assertions);
71 let threshold = if debug_threshold {
72 self.e2e_optimization_time_seconds_log_threshold * 6
78 } else {
79 self.e2e_optimization_time_seconds_log_threshold
80 };
81 if duration > threshold {
82 let transform_times = self
83 .transform_time_seconds
84 .take()
85 .into_iter()
86 .map(|(k, v)| {
87 (
88 k,
89 v.into_iter()
90 .map(|duration| duration.as_micros())
91 .collect::<Vec<_>>(),
92 )
93 })
94 .collect::<Vec<_>>();
95 let threshold_string = if debug_threshold {
96 format!("{}ms (debug)", threshold.as_millis())
97 } else {
98 format!("{}ms", threshold.as_millis())
99 };
100 tracing::warn!(
101 duration = format!("{}ms", duration.as_millis()),
102 threshold = threshold_string,
103 object_type = object_type,
104 transform_times_μs = serde_json::to_string(&transform_times)
105 .unwrap_or_else(|_| format!("{:?}", transform_times)),
106 "slow optimization",
107 );
108 }
109 }
110
111 pub fn inc_outer_join_lowering(&self, case: &str) {
112 self.outer_join_lowering_cases
113 .with_label_values(&[case])
114 .inc()
115 }
116
117 pub fn inc_transform(&self, hit: bool, transform: &str) {
118 if hit {
119 self.transform_hits.with_label_values(&[transform]).inc();
120 }
121 self.transform_total.with_label_values(&[transform]).inc();
122 }
123
124 pub fn observe_transform_time(&self, transform: &str, duration: Duration) {
125 let mut transform_time_seconds = self.transform_time_seconds.borrow_mut();
126 if let Some(times) = transform_time_seconds.get_mut(transform) {
127 times.push(duration);
128 } else {
129 transform_time_seconds.insert(transform.to_string(), vec![duration]);
130 }
131 }
132}