1use std::collections::BTreeMap;
45use std::fmt;
46use std::fmt::{Debug, Formatter};
47use std::future::Future;
48use std::pin::Pin;
49use std::sync::{Arc, Mutex};
50use std::task::{Context, Poll};
51use std::time::{Duration, Instant};
52
53use derivative::Derivative;
54use pin_project::pin_project;
55use prometheus::core::{
56 Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, Desc, GenericCounter, GenericCounterVec,
57 GenericGauge, GenericGaugeVec,
58};
59use prometheus::proto::MetricFamily;
60use prometheus::{HistogramOpts, Registry};
61
62mod delete_on_drop;
63mod rule;
64
65pub use delete_on_drop::*;
66pub use prometheus::Opts as PrometheusOpts;
67pub use rule::{NameLookup, ObjectName, Rule};
68
69#[macro_export]
71macro_rules! metric {
72 (
73 name: $name:expr,
74 help: $help:expr
75 $(, subsystem: $subsystem_name:expr)?
76 $(, const_labels: { $($cl_key:expr => $cl_value:expr ),* })?
77 $(, var_labels: [ $($vl_name:expr),* ])?
78 $(, buckets: $bk_name:expr)?
79 $(, rules: [ $($rule:expr),* $(,)? ])?
80 $(,)?
81 ) => {{
82 let const_labels = (&[
83 $($(
84 ($cl_key.to_string(), $cl_value.to_string()),
85 )*)?
86 ]).into_iter().cloned().collect();
87 let var_labels = vec![
88 $(
89 $($vl_name.into(),)*
90 )?];
91 #[allow(unused_mut)]
92 let mut mk_opts = $crate::metrics::MakeCollectorOpts {
93 opts: $crate::metrics::PrometheusOpts::new($name, $help)
94 $(.subsystem( $subsystem_name ))?
95 .const_labels(const_labels)
96 .variable_labels(var_labels),
97 buckets: None,
98 rules: vec![ $($($rule),*)? ],
99 };
100 $(mk_opts.buckets = Some($bk_name);)*
102 mk_opts
103 }}
104}
105
106#[derive(Debug, Clone)]
108pub struct MakeCollectorOpts {
109 pub opts: PrometheusOpts,
111 pub buckets: Option<Vec<f64>>,
114 pub rules: Vec<Rule>,
118}
119
120#[derive(Clone, Derivative)]
122#[derivative(Debug)]
123pub struct MetricsRegistry {
124 inner: Registry,
125 #[derivative(Debug = "ignore")]
126 postprocessors: Arc<Mutex<Vec<Box<dyn FnMut(&mut Vec<MetricFamily>) + Send + Sync>>>>,
127 rules_by_metric: Arc<Mutex<BTreeMap<String, Vec<Rule>>>>,
131}
132
133#[derive(Clone)]
141pub struct DeleteOnDropWrapper<M> {
142 inner: M,
143}
144
145impl<M: MakeCollector + Debug> Debug for DeleteOnDropWrapper<M> {
146 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
147 self.inner.fmt(f)
148 }
149}
150
151impl<M: Collector> Collector for DeleteOnDropWrapper<M> {
152 fn desc(&self) -> Vec<&Desc> {
153 self.inner.desc()
154 }
155
156 fn collect(&self) -> Vec<MetricFamily> {
157 self.inner.collect()
158 }
159}
160
161impl<M: MakeCollector> MakeCollector for DeleteOnDropWrapper<M> {
162 fn make_collector(opts: MakeCollectorOpts) -> Self {
163 DeleteOnDropWrapper {
164 inner: M::make_collector(opts),
165 }
166 }
167}
168
169impl<M: MetricVecExt> DeleteOnDropWrapper<M> {
170 pub fn get_delete_on_drop_metric<L: PromLabelsExt>(
172 &self,
173 labels: L,
174 ) -> DeleteOnDropMetric<M, L> {
175 self.inner.get_delete_on_drop_metric(labels)
176 }
177}
178
179pub type UIntGauge = GenericGauge<AtomicU64>;
182
183pub type CounterVec = DeleteOnDropWrapper<prometheus::CounterVec>;
185pub type Gauge = DeleteOnDropWrapper<prometheus::Gauge>;
187pub type GaugeVec = DeleteOnDropWrapper<prometheus::GaugeVec>;
189pub type HistogramVec = DeleteOnDropWrapper<prometheus::HistogramVec>;
191pub type IntCounterVec = DeleteOnDropWrapper<prometheus::IntCounterVec>;
193pub type IntGaugeVec = DeleteOnDropWrapper<prometheus::IntGaugeVec>;
195pub type UIntGaugeVec = DeleteOnDropWrapper<raw::UIntGaugeVec>;
197
198use crate::assert_none;
199
200pub use prometheus::{Counter, Histogram, IntCounter, IntGauge};
201
202pub mod raw {
204 use prometheus::core::{AtomicU64, GenericGaugeVec};
205
206 pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
209
210 pub use prometheus::{CounterVec, Gauge, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
211}
212
213impl MetricsRegistry {
214 pub fn new() -> Self {
216 MetricsRegistry {
217 inner: Registry::new(),
218 postprocessors: Arc::new(Mutex::new(vec![])),
219 rules_by_metric: Arc::new(Mutex::new(BTreeMap::new())),
220 }
221 }
222
223 pub fn rules_by_metric(&self) -> BTreeMap<String, Vec<Rule>> {
226 self.rules_by_metric.lock().expect("lock poisoned").clone()
227 }
228
229 fn record_per_metric_rules(&self, opts: &MakeCollectorOpts) {
232 if opts.rules.is_empty() {
233 return;
234 }
235 let fq_name = opts.opts.fq_name();
236 self.rules_by_metric
237 .lock()
238 .expect("lock poisoned")
239 .entry(fq_name)
240 .or_default()
241 .extend(opts.rules.iter().cloned());
242 }
243
244 pub fn register<M>(&self, opts: MakeCollectorOpts) -> M
246 where
247 M: MakeCollector,
248 {
249 self.record_per_metric_rules(&opts);
250 let collector = M::make_collector(opts);
251 self.inner.register(Box::new(collector.clone())).unwrap();
252 collector
253 }
254
255 pub fn register_computed_gauge<P>(
257 &self,
258 opts: MakeCollectorOpts,
259 f: impl Fn() -> P::T + Send + Sync + 'static,
260 ) -> ComputedGenericGauge<P>
261 where
262 P: Atomic + 'static,
263 {
264 self.record_per_metric_rules(&opts);
265 let gauge = ComputedGenericGauge {
266 gauge: GenericGauge::make_collector(opts),
267 f: Arc::new(f),
268 };
269 self.inner.register(Box::new(gauge.clone())).unwrap();
270 gauge
271 }
272
273 pub fn register_collector<C: 'static + prometheus::core::Collector>(&self, collector: C) {
275 self.inner
276 .register(Box::new(collector))
277 .expect("registering pre-defined metrics collector");
278 }
279
280 pub fn register_postprocessor<F>(&self, f: F)
285 where
286 F: FnMut(&mut Vec<MetricFamily>) + Send + Sync + 'static,
287 {
288 let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
289 postprocessors.push(Box::new(f));
290 }
291
292 pub fn gather(&self) -> Vec<MetricFamily> {
300 let mut metrics = self.inner.gather();
301 let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
302 for postprocessor in &mut *postprocessors {
303 postprocessor(&mut metrics);
304 }
305 metrics
306 }
307}
308
309pub trait MakeCollector: Collector + Clone + 'static {
314 fn make_collector(opts: MakeCollectorOpts) -> Self;
316}
317
318impl<T> MakeCollector for GenericCounter<T>
319where
320 T: Atomic + 'static,
321{
322 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
323 assert_none!(mk_opts.buckets);
324 Self::with_opts(mk_opts.opts).expect("defining a counter")
325 }
326}
327
328impl<T> MakeCollector for GenericCounterVec<T>
329where
330 T: Atomic + 'static,
331{
332 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
333 assert_none!(mk_opts.buckets);
334 let labels: Vec<String> = mk_opts.opts.variable_labels.clone();
335 let label_refs: Vec<&str> = labels.iter().map(String::as_str).collect();
336 Self::new(mk_opts.opts, label_refs.as_slice()).expect("defining a counter vec")
337 }
338}
339
340impl<T> MakeCollector for GenericGauge<T>
341where
342 T: Atomic + 'static,
343{
344 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
345 assert_none!(mk_opts.buckets);
346 Self::with_opts(mk_opts.opts).expect("defining a gauge")
347 }
348}
349
350impl<T> MakeCollector for GenericGaugeVec<T>
351where
352 T: Atomic + 'static,
353{
354 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
355 assert_none!(mk_opts.buckets);
356 let labels = mk_opts.opts.variable_labels.clone();
357 let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
358 Self::new(mk_opts.opts, labels).expect("defining a gauge vec")
359 }
360}
361
362impl MakeCollector for Histogram {
363 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
364 assert!(mk_opts.buckets.is_some());
365 Self::with_opts(HistogramOpts {
366 common_opts: mk_opts.opts,
367 buckets: mk_opts.buckets.unwrap(),
368 })
369 .expect("defining a histogram")
370 }
371}
372
373impl MakeCollector for raw::HistogramVec {
374 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
375 assert!(mk_opts.buckets.is_some());
376 let labels = mk_opts.opts.variable_labels.clone();
377 let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
378 Self::new(
379 HistogramOpts {
380 common_opts: mk_opts.opts,
381 buckets: mk_opts.buckets.unwrap(),
382 },
383 labels,
384 )
385 .expect("defining a histogram vec")
386 }
387}
388
389pub struct ComputedGenericGauge<P>
391where
392 P: Atomic,
393{
394 gauge: GenericGauge<P>,
395 f: Arc<dyn Fn() -> P::T + Send + Sync>,
396}
397
398impl<P> fmt::Debug for ComputedGenericGauge<P>
399where
400 P: Atomic + fmt::Debug,
401{
402 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
403 f.debug_struct("ComputedGenericGauge")
404 .field("gauge", &self.gauge)
405 .finish_non_exhaustive()
406 }
407}
408
409impl<P> Clone for ComputedGenericGauge<P>
410where
411 P: Atomic,
412{
413 fn clone(&self) -> ComputedGenericGauge<P> {
414 ComputedGenericGauge {
415 gauge: self.gauge.clone(),
416 f: Arc::clone(&self.f),
417 }
418 }
419}
420
421impl<T> Collector for ComputedGenericGauge<T>
422where
423 T: Atomic,
424{
425 fn desc(&self) -> Vec<&prometheus::core::Desc> {
426 self.gauge.desc()
427 }
428
429 fn collect(&self) -> Vec<MetricFamily> {
430 self.gauge.set((self.f)());
431 self.gauge.collect()
432 }
433}
434
435impl<P> ComputedGenericGauge<P>
436where
437 P: Atomic,
438{
439 pub fn get(&self) -> P::T {
441 (self.f)()
442 }
443}
444
445pub type ComputedGauge = ComputedGenericGauge<AtomicF64>;
447
448pub type ComputedIntGauge = ComputedGenericGauge<AtomicI64>;
450
451pub type ComputedUIntGauge = ComputedGenericGauge<AtomicU64>;
453
454pub trait MetricsFutureExt<F> {
456 fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric>;
491
492 fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric>;
529}
530
531impl<F: Future> MetricsFutureExt<F> for F {
532 fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric> {
533 WallTimeFuture {
534 fut: self,
535 metric: UnspecifiedMetric(()),
536 start: None,
537 filter: None,
538 }
539 }
540
541 fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric> {
542 ExecTimeFuture {
543 fut: self,
544 metric: UnspecifiedMetric(()),
545 running_duration: Duration::from_millis(0),
546 filter: None,
547 }
548 }
549}
550
551#[must_use = "futures do nothing unless you `.await` or poll them"]
553#[pin_project]
554pub struct WallTimeFuture<F, Metric> {
555 #[pin]
557 fut: F,
558 metric: Metric,
560 start: Option<Instant>,
562 filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
564}
565
566impl<F: Debug, M: Debug> fmt::Debug for WallTimeFuture<F, M> {
567 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
568 f.debug_struct("WallTimeFuture")
569 .field("fut", &self.fut)
570 .field("metric", &self.metric)
571 .field("start", &self.start)
572 .field("filter", &self.filter.is_some())
573 .finish()
574 }
575}
576
577impl<F> WallTimeFuture<F, UnspecifiedMetric> {
578 pub fn observe(
586 self,
587 histogram: prometheus::Histogram,
588 ) -> WallTimeFuture<F, prometheus::Histogram> {
589 WallTimeFuture {
590 fut: self.fut,
591 metric: histogram,
592 start: self.start,
593 filter: self.filter,
594 }
595 }
596
597 pub fn inc_by(self, counter: prometheus::Counter) -> WallTimeFuture<F, prometheus::Counter> {
605 WallTimeFuture {
606 fut: self.fut,
607 metric: counter,
608 start: self.start,
609 filter: self.filter,
610 }
611 }
612
613 pub fn set_at(self, place: &mut f64) -> WallTimeFuture<F, &mut f64> {
615 WallTimeFuture {
616 fut: self.fut,
617 metric: place,
618 start: self.start,
619 filter: self.filter,
620 }
621 }
622}
623
624impl<F, M> WallTimeFuture<F, M> {
625 pub fn with_filter(
630 mut self,
631 filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
632 ) -> Self {
633 self.filter = Some(Box::new(filter));
634 self
635 }
636}
637
638impl<F: Future, M: DurationMetric> Future for WallTimeFuture<F, M> {
639 type Output = F::Output;
640
641 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
642 let this = self.project();
643
644 if this.start.is_none() {
645 *this.start = Some(Instant::now());
646 }
647
648 let result = match this.fut.poll(cx) {
649 Poll::Ready(r) => r,
650 Poll::Pending => return Poll::Pending,
651 };
652 let duration = Instant::now().duration_since(this.start.expect("timer to be started"));
653
654 let pass = this
655 .filter
656 .as_mut()
657 .map(|filter| filter(duration))
658 .unwrap_or(true);
659 if pass {
660 this.metric.record(duration.as_secs_f64())
661 }
662
663 Poll::Ready(result)
664 }
665}
666
667#[must_use = "futures do nothing unless you `.await` or poll them"]
669#[pin_project]
670pub struct ExecTimeFuture<F, Metric> {
671 #[pin]
673 fut: F,
674 metric: Metric,
676 running_duration: Duration,
678 filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
680}
681
682impl<F: Debug, M: Debug> fmt::Debug for ExecTimeFuture<F, M> {
683 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
684 f.debug_struct("ExecTimeFuture")
685 .field("fut", &self.fut)
686 .field("metric", &self.metric)
687 .field("running_duration", &self.running_duration)
688 .field("filter", &self.filter.is_some())
689 .finish()
690 }
691}
692
693impl<F> ExecTimeFuture<F, UnspecifiedMetric> {
694 pub fn observe(
702 self,
703 histogram: prometheus::Histogram,
704 ) -> ExecTimeFuture<F, prometheus::Histogram> {
705 ExecTimeFuture {
706 fut: self.fut,
707 metric: histogram,
708 running_duration: self.running_duration,
709 filter: self.filter,
710 }
711 }
712
713 pub fn inc_by(self, counter: prometheus::Counter) -> ExecTimeFuture<F, prometheus::Counter> {
721 ExecTimeFuture {
722 fut: self.fut,
723 metric: counter,
724 running_duration: self.running_duration,
725 filter: self.filter,
726 }
727 }
728}
729
730impl<F, M> ExecTimeFuture<F, M> {
731 pub fn with_filter(
733 mut self,
734 filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
735 ) -> Self {
736 self.filter = Some(Box::new(filter));
737 self
738 }
739}
740
741impl<F: Future, M: DurationMetric> Future for ExecTimeFuture<F, M> {
742 type Output = F::Output;
743
744 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
745 let this = self.project();
746
747 let start = Instant::now();
748 let result = this.fut.poll(cx);
749 let duration = Instant::now().duration_since(start);
750
751 *this.running_duration = this.running_duration.saturating_add(duration);
752
753 let result = match result {
754 Poll::Ready(result) => result,
755 Poll::Pending => return Poll::Pending,
756 };
757
758 let duration = *this.running_duration;
759 let pass = this
760 .filter
761 .as_mut()
762 .map(|filter| filter(duration))
763 .unwrap_or(true);
764 if pass {
765 this.metric.record(duration.as_secs_f64());
766 }
767
768 Poll::Ready(result)
769 }
770}
771
772#[derive(Debug)]
779pub struct UnspecifiedMetric(());
780
781trait DurationMetric {
785 fn record(&mut self, seconds: f64);
786}
787
788impl DurationMetric for prometheus::Histogram {
789 fn record(&mut self, seconds: f64) {
790 self.observe(seconds)
791 }
792}
793
794impl DurationMetric for prometheus::Counter {
795 fn record(&mut self, seconds: f64) {
796 self.inc_by(seconds)
797 }
798}
799
800impl DurationMetric for &'_ mut f64 {
803 fn record(&mut self, seconds: f64) {
804 **self = seconds;
805 }
806}
807
808#[cfg(feature = "async")]
810pub fn register_runtime_metrics(
811 name: &'static str,
812 runtime_metrics: tokio::runtime::RuntimeMetrics,
813 registry: &MetricsRegistry,
814) {
815 macro_rules! register {
816 ($method:ident, $doc:literal) => {
817 let metrics = runtime_metrics.clone();
818 registry.register_computed_gauge::<prometheus::core::AtomicU64>(
819 crate::metric!(
820 name: concat!("mz_tokio_", stringify!($method)),
821 help: $doc,
822 const_labels: {"runtime" => name},
823 ),
824 move || <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method()),
825 );
826 };
827 }
828
829 macro_rules! register_per_worker {
830 ($method:ident, $doc:literal) => {
831 let metrics = runtime_metrics.clone();
832 registry.register_computed_gauge::<prometheus::core::AtomicU64>(
833 crate::metric!(
834 name: concat!("mz_tokio_", stringify!($method)),
835 help: $doc,
836 const_labels: {"runtime" => name},
837 ),
838 move || {
839 (0..metrics.num_workers())
840 .map(|i| <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method(i)))
841 .sum::<u64>()
842 },
843 );
844 };
845 }
846
847 macro_rules! register_per_worker_duration_secs {
848 ($method:ident, $doc:literal) => {
849 let metrics = runtime_metrics.clone();
850 registry.register_computed_gauge::<prometheus::core::AtomicF64>(
851 crate::metric!(
852 name: concat!("mz_tokio_", stringify!($method)),
853 help: $doc,
854 const_labels: {"runtime" => name},
855 ),
856 move || {
857 (0..metrics.num_workers())
858 .map(|i| metrics.$method(i).as_secs_f64())
859 .sum::<f64>()
860 },
861 );
862 };
863 }
864
865 register!(
866 num_workers,
867 "The number of worker threads used by the runtime."
868 );
869 register!(
870 num_alive_tasks,
871 "The current number of alive tasks in the runtime."
872 );
873 register!(
874 global_queue_depth,
875 "The number of tasks currently scheduled in the runtime's global queue."
876 );
877 register_per_worker_duration_secs!(
878 worker_total_busy_duration,
879 "The amount of time the worker threads have been busy, in seconds."
880 );
881 register_per_worker!(
882 worker_park_count,
883 "The total number of times the worker threads have parked."
884 );
885 register_per_worker!(
886 worker_park_unpark_count,
887 "The total number of times the worker threads have parked and unparked."
888 );
889
890 #[cfg(tokio_unstable)]
891 {
892 register!(
893 num_blocking_threads,
894 "The number of additional threads spawned by the runtime."
895 );
896 register!(
897 num_idle_blocking_threads,
898 "The number of idle threads which have spawned by the runtime for spawn_blocking calls."
899 );
900 register_per_worker!(
901 worker_local_queue_depth,
902 "The number of tasks currently scheduled in the workers' local queues."
903 );
904 register!(
905 blocking_queue_depth,
906 "The number of tasks currently scheduled in the blocking thread pool, spawned using spawn_blocking."
907 );
908 register!(
909 spawned_tasks_count,
910 "The number of tasks spawned in this runtime since it was created."
911 );
912 register!(
913 remote_schedule_count,
914 "The number of tasks scheduled from outside of the runtime."
915 );
916 register!(
917 budget_forced_yield_count,
918 "The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets."
919 );
920 register_per_worker!(
921 worker_noop_count,
922 "The number of times the given worker thread unparked but performed no work before parking again."
923 );
924 register_per_worker!(
925 worker_steal_count,
926 "The number of tasks the given worker thread stole from another worker thread."
927 );
928 register_per_worker!(
929 worker_steal_operations,
930 "The number of times the given worker thread stole tasks from another worker thread."
931 );
932 register_per_worker!(
933 worker_poll_count,
934 "The number of tasks the given worker thread has polled."
935 );
936 register_per_worker!(
937 worker_local_schedule_count,
938 "The number of tasks scheduled from within the runtime on the given worker's local queue."
939 );
940 register_per_worker!(
941 worker_overflow_count,
942 "The number of times the given worker thread saturated its local queue."
943 );
944 register_per_worker_duration_secs!(
945 worker_mean_poll_time,
946 "The mean duration of task polls in seconds."
947 );
948 }
949}
950
951#[cfg(test)]
952mod tests {
953 use std::time::Duration;
954
955 use prometheus::{CounterVec, HistogramVec};
956
957 use crate::stats::histogram_seconds_buckets;
958
959 use super::{MetricsFutureExt, MetricsRegistry};
960
961 struct Metrics {
962 pub wall_time_hist: HistogramVec,
963 pub wall_time_cnt: CounterVec,
964 pub exec_time_hist: HistogramVec,
965 pub exec_time_cnt: CounterVec,
966 }
967
968 impl Metrics {
969 pub fn register_into(registry: &MetricsRegistry) -> Self {
970 Self {
971 wall_time_hist: registry.register(metric!(
972 name: "wall_time_hist",
973 help: "help",
974 var_labels: ["action"],
975 buckets: histogram_seconds_buckets(0.000_128, 8.0),
976 )),
977 wall_time_cnt: registry.register(metric!(
978 name: "wall_time_cnt",
979 help: "help",
980 var_labels: ["action"],
981 )),
982 exec_time_hist: registry.register(metric!(
983 name: "exec_time_hist",
984 help: "help",
985 var_labels: ["action"],
986 buckets: histogram_seconds_buckets(0.000_128, 8.0),
987 )),
988 exec_time_cnt: registry.register(metric!(
989 name: "exec_time_cnt",
990 help: "help",
991 var_labels: ["action"],
992 )),
993 }
994 }
995 }
996
997 #[crate::test]
998 #[cfg_attr(miri, ignore)] fn smoke_test_metrics_future_ext() {
1000 let runtime = tokio::runtime::Builder::new_current_thread()
1001 .enable_time()
1002 .build()
1003 .expect("failed to start runtime");
1004 let registry = MetricsRegistry::new();
1005 let metrics = Metrics::register_into(®istry);
1006
1007 let async_sleep_future = async {
1009 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1010 };
1011 runtime.block_on(
1012 async_sleep_future
1013 .wall_time()
1014 .observe(metrics.wall_time_hist.with_label_values(&["async_sleep_w"]))
1015 .exec_time()
1016 .observe(metrics.exec_time_hist.with_label_values(&["async_sleep_e"])),
1017 );
1018
1019 let reports = registry.gather();
1020
1021 let exec_family = reports
1022 .iter()
1023 .find(|m| m.name() == "exec_time_hist")
1024 .expect("metric not found");
1025 let exec_metric = exec_family.get_metric();
1026 assert_eq!(exec_metric.len(), 1);
1027 assert_eq!(exec_metric[0].get_label()[0].value(), "async_sleep_e");
1028
1029 let exec_histogram = exec_metric[0].get_histogram();
1030 assert_eq!(exec_histogram.get_sample_count(), 1);
1031 let wall_family = reports
1035 .iter()
1036 .find(|m| m.name() == "wall_time_hist")
1037 .expect("metric not found");
1038 let wall_metric = wall_family.get_metric();
1039 assert_eq!(wall_metric.len(), 1);
1040 assert_eq!(wall_metric[0].get_label()[0].value(), "async_sleep_w");
1041
1042 let wall_histogram = wall_metric[0].get_histogram();
1043 assert_eq!(wall_histogram.get_sample_count(), 1);
1044 assert_eq!(wall_histogram.get_bucket()[12].cumulative_count(), 0);
1047
1048 let registry = MetricsRegistry::new();
1050 let metrics = Metrics::register_into(®istry);
1051
1052 let thread_sleep_future = async {
1054 std::thread::sleep(std::time::Duration::from_secs(1));
1055 };
1056 runtime.block_on(
1057 thread_sleep_future
1058 .wall_time()
1059 .with_filter(|duration| duration < Duration::from_millis(10))
1060 .inc_by(metrics.wall_time_cnt.with_label_values(&["thread_sleep_w"]))
1061 .exec_time()
1062 .inc_by(metrics.exec_time_cnt.with_label_values(&["thread_sleep_e"])),
1063 );
1064
1065 let reports = registry.gather();
1066
1067 let exec_family = reports
1068 .iter()
1069 .find(|m| m.name() == "exec_time_cnt")
1070 .expect("metric not found");
1071 let exec_metric = exec_family.get_metric();
1072 assert_eq!(exec_metric.len(), 1);
1073 assert_eq!(exec_metric[0].get_label()[0].value(), "thread_sleep_e");
1074
1075 let exec_counter = exec_metric[0].get_counter();
1076 assert!(exec_counter.value() >= 1.0);
1078
1079 let wall_family = reports
1080 .iter()
1081 .find(|m| m.name() == "wall_time_cnt")
1082 .expect("metric not found");
1083 let wall_metric = wall_family.get_metric();
1084 assert_eq!(wall_metric.len(), 1);
1085
1086 let wall_counter = wall_metric[0].get_counter();
1087 assert_eq!(wall_counter.value(), 0.0);
1089 }
1090
1091 #[crate::test]
1092 fn register_metric_stores_rules() {
1093 let registry = MetricsRegistry::new();
1094 let cluster_rule = super::Rule::ClusterNameLookup {
1095 cluster_id_label: "cluster_id".into(),
1096 output_label: "cluster_name".into(),
1097 };
1098 let replica_rule = super::Rule::ReplicaNameLookup {
1099 cluster_id_label: "cluster_id".into(),
1100 replica_id_label: "replica_id".into(),
1101 output_label: "replica_name".into(),
1102 };
1103 let _: prometheus::IntCounter = registry.register(crate::metric!(
1104 name: "mz_test_register_metric_stores_rules",
1105 help: "test metric",
1106 rules: [
1107 cluster_rule.clone(),
1108 replica_rule.clone(),
1109 ],
1110 ));
1111 let by_metric = registry.rules_by_metric();
1112 let rules = by_metric
1113 .get("mz_test_register_metric_stores_rules")
1114 .expect("rules registered under fq name");
1115 assert_eq!(rules, &vec![cluster_rule, replica_rule]);
1116 }
1117
1118 #[crate::test]
1119 fn register_metric_without_rules_omits_entry() {
1120 let registry = MetricsRegistry::new();
1121 let _: prometheus::IntCounter = registry.register(crate::metric!(
1122 name: "mz_test_register_metric_without_rules",
1123 help: "test metric without rules",
1124 ));
1125 assert!(registry.rules_by_metric().is_empty());
1126 }
1127}