mz_sql/
optimizer_metrics.rs1use std::time::Duration;
13
14use mz_ore::metric;
15use mz_ore::metrics::MetricsRegistry;
16use mz_ore::stats::histogram_seconds_buckets;
17use prometheus::{HistogramVec, IntCounterVec};
18
19#[derive(Debug, Clone)]
21pub struct OptimizerMetrics {
22 e2e_optimization_time_seconds: HistogramVec,
23 e2e_optimization_time_seconds_log_threshold: Duration,
24 outer_join_lowering_cases: IntCounterVec,
25 transform_hits: IntCounterVec,
26 transform_total: IntCounterVec,
27 transform_time_seconds: std::collections::BTreeMap<String, Vec<Duration>>,
30}
31
32impl OptimizerMetrics {
33 pub fn register_into(
34 registry: &MetricsRegistry,
35 e2e_optimization_time_seconds_log_threshold: Duration,
36 ) -> Self {
37 Self {
38 e2e_optimization_time_seconds: registry.register(metric!(
39 name: "mz_optimizer_e2e_optimization_time_seconds",
40 help: "A histogram of end-to-end optimization times since restart.",
41 var_labels: ["object_type"],
42 buckets: histogram_seconds_buckets(0.000_128, 8.0),
43 )),
44 e2e_optimization_time_seconds_log_threshold,
45 outer_join_lowering_cases: registry.register(metric!(
46 name: "outer_join_lowering_cases",
47 help: "How many times the different outer join lowering cases happened.",
48 var_labels: ["case"],
49 )),
50 transform_hits: registry.register(metric!(
51 name: "transform_hits",
52 help: "How many times a given transform changed the plan.",
53 var_labels: ["transform"],
54 )),
55 transform_total: registry.register(metric!(
56 name: "transform_total",
57 help: "How many times a given transform was applied.",
58 var_labels: ["transform"],
59 )),
60 transform_time_seconds: std::collections::BTreeMap::new(),
61 }
62 }
63
64 pub fn observe_e2e_optimization_time(&self, object_type: &str, duration: Duration) {
65 self.e2e_optimization_time_seconds
66 .with_label_values(&[object_type])
67 .observe(duration.as_secs_f64());
68 let debug_threshold = cfg!(debug_assertions);
70 let threshold = if debug_threshold {
71 self.e2e_optimization_time_seconds_log_threshold * 6
77 } else {
78 self.e2e_optimization_time_seconds_log_threshold
79 };
80 if duration > threshold {
81 let transform_times = self
82 .transform_time_seconds
83 .iter()
84 .map(|(k, v)| {
85 (
86 k,
87 v.into_iter()
88 .map(|duration| duration.as_micros())
89 .collect::<Vec<_>>(),
90 )
91 })
92 .collect::<Vec<_>>();
93 let threshold_string = if debug_threshold {
94 format!("{}ms (debug)", threshold.as_millis())
95 } else {
96 format!("{}ms", threshold.as_millis())
97 };
98 tracing::warn!(
99 duration = format!("{}ms", duration.as_millis()),
100 threshold = threshold_string,
101 object_type = object_type,
102 transform_times_μs = serde_json::to_string(&transform_times)
103 .unwrap_or_else(|_| format!("{:?}", transform_times)),
104 "slow optimization",
105 );
106 }
107 }
108
109 pub fn inc_outer_join_lowering(&self, case: &str) {
110 self.outer_join_lowering_cases
111 .with_label_values(&[case])
112 .inc()
113 }
114
115 pub fn inc_transform(&self, hit: bool, transform: &str) {
116 if hit {
117 self.transform_hits.with_label_values(&[transform]).inc();
118 }
119 self.transform_total.with_label_values(&[transform]).inc();
120 }
121
122 pub fn observe_transform_time(&mut self, transform: &str, duration: Duration) {
123 let transform_time_seconds = &mut self.transform_time_seconds;
124 if let Some(times) = transform_time_seconds.get_mut(transform) {
125 times.push(duration);
126 } else {
127 transform_time_seconds.insert(transform.to_string(), vec![duration]);
128 }
129 }
130}