mz_sql/
optimizer_metrics.rs
1use std::cell::RefCell;
13use std::time::Duration;
14
15use mz_ore::metric;
16use mz_ore::metrics::MetricsRegistry;
17use mz_ore::stats::histogram_seconds_buckets;
18use prometheus::{HistogramVec, IntCounterVec};
19
20#[derive(Debug, Clone)]
22pub struct OptimizerMetrics {
23 e2e_optimization_time_seconds: HistogramVec,
24 e2e_optimization_time_seconds_log_threshold: Duration,
25 outer_join_lowering_cases: IntCounterVec,
26 transform_hits: IntCounterVec,
27 transform_total: IntCounterVec,
28 transform_time_seconds: RefCell<std::collections::BTreeMap<String, Vec<Duration>>>,
31}
32
33impl OptimizerMetrics {
34 pub fn register_into(
35 registry: &MetricsRegistry,
36 e2e_optimization_time_seconds_log_threshold: Duration,
37 ) -> Self {
38 Self {
39 e2e_optimization_time_seconds: registry.register(metric!(
40 name: "mz_optimizer_e2e_optimization_time_seconds",
41 help: "A histogram of end-to-end optimization times since restart.",
42 var_labels: ["object_type"],
43 buckets: histogram_seconds_buckets(0.000_128, 8.0),
44 )),
45 e2e_optimization_time_seconds_log_threshold,
46 outer_join_lowering_cases: registry.register(metric!(
47 name: "outer_join_lowering_cases",
48 help: "How many times the different outer join lowering cases happened.",
49 var_labels: ["case"],
50 )),
51 transform_hits: registry.register(metric!(
52 name: "transform_hits",
53 help: "How many times a given transform changed the plan.",
54 var_labels: ["transform"],
55 )),
56 transform_total: registry.register(metric!(
57 name: "transform_total",
58 help: "How many times a given transform was applied.",
59 var_labels: ["transform"],
60 )),
61 transform_time_seconds: RefCell::new(std::collections::BTreeMap::new()),
62 }
63 }
64
65 pub fn observe_e2e_optimization_time(&self, object_type: &str, duration: Duration) {
66 self.e2e_optimization_time_seconds
67 .with_label_values(&[object_type])
68 .observe(duration.as_secs_f64());
69 if duration > self.e2e_optimization_time_seconds_log_threshold {
70 let transform_times = self
71 .transform_time_seconds
72 .take()
73 .into_iter()
74 .map(|(k, v)| {
75 (
76 k,
77 v.into_iter()
78 .map(|duration| duration.as_micros())
79 .collect::<Vec<_>>(),
80 )
81 })
82 .collect::<Vec<_>>();
83 tracing::warn!(
84 duration = format!("{}ms", duration.as_millis()),
85 threshold = format!(
86 "{}ms",
87 self.e2e_optimization_time_seconds_log_threshold.as_millis()
88 ),
89 object_type = object_type,
90 transform_times_μs = serde_json::to_string(&transform_times)
91 .unwrap_or_else(|_| format!("{:?}", transform_times)),
92 "slow optimization",
93 );
94 }
95 }
96
97 pub fn inc_outer_join_lowering(&self, case: &str) {
98 self.outer_join_lowering_cases
99 .with_label_values(&[case])
100 .inc()
101 }
102
103 pub fn inc_transform(&self, hit: bool, transform: &str) {
104 if hit {
105 self.transform_hits.with_label_values(&[transform]).inc();
106 }
107 self.transform_total.with_label_values(&[transform]).inc();
108 }
109
110 pub fn observe_transform_time(&self, transform: &str, duration: Duration) {
111 let mut transform_time_seconds = self.transform_time_seconds.borrow_mut();
112 if let Some(times) = transform_time_seconds.get_mut(transform) {
113 times.push(duration);
114 } else {
115 transform_time_seconds.insert(transform.to_string(), vec![duration]);
116 }
117 }
118}