1use std::fmt;
45use std::fmt::{Debug, Formatter};
46use std::future::Future;
47use std::pin::Pin;
48use std::sync::{Arc, Mutex};
49use std::task::{Context, Poll};
50use std::time::{Duration, Instant};
51
52use derivative::Derivative;
53use pin_project::pin_project;
54use prometheus::core::{
55 Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, Desc, GenericCounter, GenericCounterVec,
56 GenericGauge, GenericGaugeVec,
57};
58use prometheus::proto::MetricFamily;
59use prometheus::{HistogramOpts, Registry};
60
61mod delete_on_drop;
62
63pub use delete_on_drop::*;
64pub use prometheus::Opts as PrometheusOpts;
65
66#[macro_export]
68macro_rules! metric {
69 (
70 name: $name:expr,
71 help: $help:expr
72 $(, subsystem: $subsystem_name:expr)?
73 $(, const_labels: { $($cl_key:expr => $cl_value:expr ),* })?
74 $(, var_labels: [ $($vl_name:expr),* ])?
75 $(, buckets: $bk_name:expr)?
76 $(,)?
77 ) => {{
78 let const_labels = (&[
79 $($(
80 ($cl_key.to_string(), $cl_value.to_string()),
81 )*)?
82 ]).into_iter().cloned().collect();
83 let var_labels = vec![
84 $(
85 $($vl_name.into(),)*
86 )?];
87 #[allow(unused_mut)]
88 let mut mk_opts = $crate::metrics::MakeCollectorOpts {
89 opts: $crate::metrics::PrometheusOpts::new($name, $help)
90 $(.subsystem( $subsystem_name ))?
91 .const_labels(const_labels)
92 .variable_labels(var_labels),
93 buckets: None,
94 };
95 $(mk_opts.buckets = Some($bk_name);)*
97 mk_opts
98 }}
99}
100
101#[derive(Debug, Clone)]
103pub struct MakeCollectorOpts {
104 pub opts: PrometheusOpts,
106 pub buckets: Option<Vec<f64>>,
109}
110
111#[derive(Clone, Derivative)]
113#[derivative(Debug)]
114pub struct MetricsRegistry {
115 inner: Registry,
116 #[derivative(Debug = "ignore")]
117 postprocessors: Arc<Mutex<Vec<Box<dyn FnMut(&mut Vec<MetricFamily>) + Send + Sync>>>>,
118}
119
120#[derive(Clone)]
128pub struct DeleteOnDropWrapper<M> {
129 inner: M,
130}
131
132impl<M: MakeCollector + Debug> Debug for DeleteOnDropWrapper<M> {
133 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
134 self.inner.fmt(f)
135 }
136}
137
138impl<M: Collector> Collector for DeleteOnDropWrapper<M> {
139 fn desc(&self) -> Vec<&Desc> {
140 self.inner.desc()
141 }
142
143 fn collect(&self) -> Vec<MetricFamily> {
144 self.inner.collect()
145 }
146}
147
148impl<M: MakeCollector> MakeCollector for DeleteOnDropWrapper<M> {
149 fn make_collector(opts: MakeCollectorOpts) -> Self {
150 DeleteOnDropWrapper {
151 inner: M::make_collector(opts),
152 }
153 }
154}
155
156impl<M: MetricVecExt> DeleteOnDropWrapper<M> {
157 pub fn get_delete_on_drop_metric<L: PromLabelsExt>(
159 &self,
160 labels: L,
161 ) -> DeleteOnDropMetric<M, L> {
162 self.inner.get_delete_on_drop_metric(labels)
163 }
164}
165
166pub type UIntGauge = GenericGauge<AtomicU64>;
169
170pub type CounterVec = DeleteOnDropWrapper<prometheus::CounterVec>;
172pub type Gauge = DeleteOnDropWrapper<prometheus::Gauge>;
174pub type GaugeVec = DeleteOnDropWrapper<prometheus::GaugeVec>;
176pub type HistogramVec = DeleteOnDropWrapper<prometheus::HistogramVec>;
178pub type IntCounterVec = DeleteOnDropWrapper<prometheus::IntCounterVec>;
180pub type IntGaugeVec = DeleteOnDropWrapper<prometheus::IntGaugeVec>;
182pub type UIntGaugeVec = DeleteOnDropWrapper<raw::UIntGaugeVec>;
184
185use crate::assert_none;
186
187pub use prometheus::{Counter, Histogram, IntCounter, IntGauge};
188
189pub mod raw {
191 use prometheus::core::{AtomicU64, GenericGaugeVec};
192
193 pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
196
197 pub use prometheus::{CounterVec, Gauge, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
198}
199
200impl MetricsRegistry {
201 pub fn new() -> Self {
203 MetricsRegistry {
204 inner: Registry::new(),
205 postprocessors: Arc::new(Mutex::new(vec![])),
206 }
207 }
208
209 pub fn register<M>(&self, opts: MakeCollectorOpts) -> M
211 where
212 M: MakeCollector,
213 {
214 let collector = M::make_collector(opts);
215 self.inner.register(Box::new(collector.clone())).unwrap();
216 collector
217 }
218
219 pub fn register_computed_gauge<P>(
221 &self,
222 opts: MakeCollectorOpts,
223 f: impl Fn() -> P::T + Send + Sync + 'static,
224 ) -> ComputedGenericGauge<P>
225 where
226 P: Atomic + 'static,
227 {
228 let gauge = ComputedGenericGauge {
229 gauge: GenericGauge::make_collector(opts),
230 f: Arc::new(f),
231 };
232 self.inner.register(Box::new(gauge.clone())).unwrap();
233 gauge
234 }
235
236 pub fn register_collector<C: 'static + prometheus::core::Collector>(&self, collector: C) {
238 self.inner
239 .register(Box::new(collector))
240 .expect("registering pre-defined metrics collector");
241 }
242
243 pub fn register_postprocessor<F>(&self, f: F)
248 where
249 F: FnMut(&mut Vec<MetricFamily>) + Send + Sync + 'static,
250 {
251 let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
252 postprocessors.push(Box::new(f));
253 }
254
255 pub fn gather(&self) -> Vec<MetricFamily> {
263 let mut metrics = self.inner.gather();
264 let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
265 for postprocessor in &mut *postprocessors {
266 postprocessor(&mut metrics);
267 }
268 metrics
269 }
270}
271
272pub trait MakeCollector: Collector + Clone + 'static {
277 fn make_collector(opts: MakeCollectorOpts) -> Self;
279}
280
281impl<T> MakeCollector for GenericCounter<T>
282where
283 T: Atomic + 'static,
284{
285 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
286 assert_none!(mk_opts.buckets);
287 Self::with_opts(mk_opts.opts).expect("defining a counter")
288 }
289}
290
291impl<T> MakeCollector for GenericCounterVec<T>
292where
293 T: Atomic + 'static,
294{
295 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
296 assert_none!(mk_opts.buckets);
297 let labels: Vec<String> = mk_opts.opts.variable_labels.clone();
298 let label_refs: Vec<&str> = labels.iter().map(String::as_str).collect();
299 Self::new(mk_opts.opts, label_refs.as_slice()).expect("defining a counter vec")
300 }
301}
302
303impl<T> MakeCollector for GenericGauge<T>
304where
305 T: Atomic + 'static,
306{
307 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
308 assert_none!(mk_opts.buckets);
309 Self::with_opts(mk_opts.opts).expect("defining a gauge")
310 }
311}
312
313impl<T> MakeCollector for GenericGaugeVec<T>
314where
315 T: Atomic + 'static,
316{
317 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
318 assert_none!(mk_opts.buckets);
319 let labels = mk_opts.opts.variable_labels.clone();
320 let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
321 Self::new(mk_opts.opts, labels).expect("defining a gauge vec")
322 }
323}
324
325impl MakeCollector for Histogram {
326 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
327 assert!(mk_opts.buckets.is_some());
328 Self::with_opts(HistogramOpts {
329 common_opts: mk_opts.opts,
330 buckets: mk_opts.buckets.unwrap(),
331 })
332 .expect("defining a histogram")
333 }
334}
335
336impl MakeCollector for raw::HistogramVec {
337 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
338 assert!(mk_opts.buckets.is_some());
339 let labels = mk_opts.opts.variable_labels.clone();
340 let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
341 Self::new(
342 HistogramOpts {
343 common_opts: mk_opts.opts,
344 buckets: mk_opts.buckets.unwrap(),
345 },
346 labels,
347 )
348 .expect("defining a histogram vec")
349 }
350}
351
352pub struct ComputedGenericGauge<P>
354where
355 P: Atomic,
356{
357 gauge: GenericGauge<P>,
358 f: Arc<dyn Fn() -> P::T + Send + Sync>,
359}
360
361impl<P> fmt::Debug for ComputedGenericGauge<P>
362where
363 P: Atomic + fmt::Debug,
364{
365 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
366 f.debug_struct("ComputedGenericGauge")
367 .field("gauge", &self.gauge)
368 .finish_non_exhaustive()
369 }
370}
371
372impl<P> Clone for ComputedGenericGauge<P>
373where
374 P: Atomic,
375{
376 fn clone(&self) -> ComputedGenericGauge<P> {
377 ComputedGenericGauge {
378 gauge: self.gauge.clone(),
379 f: Arc::clone(&self.f),
380 }
381 }
382}
383
384impl<T> Collector for ComputedGenericGauge<T>
385where
386 T: Atomic,
387{
388 fn desc(&self) -> Vec<&prometheus::core::Desc> {
389 self.gauge.desc()
390 }
391
392 fn collect(&self) -> Vec<MetricFamily> {
393 self.gauge.set((self.f)());
394 self.gauge.collect()
395 }
396}
397
398impl<P> ComputedGenericGauge<P>
399where
400 P: Atomic,
401{
402 pub fn get(&self) -> P::T {
404 (self.f)()
405 }
406}
407
408pub type ComputedGauge = ComputedGenericGauge<AtomicF64>;
410
411pub type ComputedIntGauge = ComputedGenericGauge<AtomicI64>;
413
414pub type ComputedUIntGauge = ComputedGenericGauge<AtomicU64>;
416
417pub trait MetricsFutureExt<F> {
419 fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric>;
454
455 fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric>;
492}
493
494impl<F: Future> MetricsFutureExt<F> for F {
495 fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric> {
496 WallTimeFuture {
497 fut: self,
498 metric: UnspecifiedMetric(()),
499 start: None,
500 filter: None,
501 }
502 }
503
504 fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric> {
505 ExecTimeFuture {
506 fut: self,
507 metric: UnspecifiedMetric(()),
508 running_duration: Duration::from_millis(0),
509 filter: None,
510 }
511 }
512}
513
514#[must_use = "futures do nothing unless you `.await` or poll them"]
516#[pin_project]
517pub struct WallTimeFuture<F, Metric> {
518 #[pin]
520 fut: F,
521 metric: Metric,
523 start: Option<Instant>,
525 filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
527}
528
529impl<F: Debug, M: Debug> fmt::Debug for WallTimeFuture<F, M> {
530 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
531 f.debug_struct("WallTimeFuture")
532 .field("fut", &self.fut)
533 .field("metric", &self.metric)
534 .field("start", &self.start)
535 .field("filter", &self.filter.is_some())
536 .finish()
537 }
538}
539
540impl<F> WallTimeFuture<F, UnspecifiedMetric> {
541 pub fn observe(
549 self,
550 histogram: prometheus::Histogram,
551 ) -> WallTimeFuture<F, prometheus::Histogram> {
552 WallTimeFuture {
553 fut: self.fut,
554 metric: histogram,
555 start: self.start,
556 filter: self.filter,
557 }
558 }
559
560 pub fn inc_by(self, counter: prometheus::Counter) -> WallTimeFuture<F, prometheus::Counter> {
568 WallTimeFuture {
569 fut: self.fut,
570 metric: counter,
571 start: self.start,
572 filter: self.filter,
573 }
574 }
575
576 pub fn set_at(self, place: &mut f64) -> WallTimeFuture<F, &mut f64> {
578 WallTimeFuture {
579 fut: self.fut,
580 metric: place,
581 start: self.start,
582 filter: self.filter,
583 }
584 }
585}
586
587impl<F, M> WallTimeFuture<F, M> {
588 pub fn with_filter(
593 mut self,
594 filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
595 ) -> Self {
596 self.filter = Some(Box::new(filter));
597 self
598 }
599}
600
601impl<F: Future, M: DurationMetric> Future for WallTimeFuture<F, M> {
602 type Output = F::Output;
603
604 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
605 let this = self.project();
606
607 if this.start.is_none() {
608 *this.start = Some(Instant::now());
609 }
610
611 let result = match this.fut.poll(cx) {
612 Poll::Ready(r) => r,
613 Poll::Pending => return Poll::Pending,
614 };
615 let duration = Instant::now().duration_since(this.start.expect("timer to be started"));
616
617 let pass = this
618 .filter
619 .as_mut()
620 .map(|filter| filter(duration))
621 .unwrap_or(true);
622 if pass {
623 this.metric.record(duration.as_secs_f64())
624 }
625
626 Poll::Ready(result)
627 }
628}
629
630#[must_use = "futures do nothing unless you `.await` or poll them"]
632#[pin_project]
633pub struct ExecTimeFuture<F, Metric> {
634 #[pin]
636 fut: F,
637 metric: Metric,
639 running_duration: Duration,
641 filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
643}
644
645impl<F: Debug, M: Debug> fmt::Debug for ExecTimeFuture<F, M> {
646 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
647 f.debug_struct("ExecTimeFuture")
648 .field("fut", &self.fut)
649 .field("metric", &self.metric)
650 .field("running_duration", &self.running_duration)
651 .field("filter", &self.filter.is_some())
652 .finish()
653 }
654}
655
656impl<F> ExecTimeFuture<F, UnspecifiedMetric> {
657 pub fn observe(
665 self,
666 histogram: prometheus::Histogram,
667 ) -> ExecTimeFuture<F, prometheus::Histogram> {
668 ExecTimeFuture {
669 fut: self.fut,
670 metric: histogram,
671 running_duration: self.running_duration,
672 filter: self.filter,
673 }
674 }
675
676 pub fn inc_by(self, counter: prometheus::Counter) -> ExecTimeFuture<F, prometheus::Counter> {
684 ExecTimeFuture {
685 fut: self.fut,
686 metric: counter,
687 running_duration: self.running_duration,
688 filter: self.filter,
689 }
690 }
691}
692
693impl<F, M> ExecTimeFuture<F, M> {
694 pub fn with_filter(
696 mut self,
697 filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
698 ) -> Self {
699 self.filter = Some(Box::new(filter));
700 self
701 }
702}
703
704impl<F: Future, M: DurationMetric> Future for ExecTimeFuture<F, M> {
705 type Output = F::Output;
706
707 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
708 let this = self.project();
709
710 let start = Instant::now();
711 let result = this.fut.poll(cx);
712 let duration = Instant::now().duration_since(start);
713
714 *this.running_duration = this.running_duration.saturating_add(duration);
715
716 let result = match result {
717 Poll::Ready(result) => result,
718 Poll::Pending => return Poll::Pending,
719 };
720
721 let duration = *this.running_duration;
722 let pass = this
723 .filter
724 .as_mut()
725 .map(|filter| filter(duration))
726 .unwrap_or(true);
727 if pass {
728 this.metric.record(duration.as_secs_f64());
729 }
730
731 Poll::Ready(result)
732 }
733}
734
735#[derive(Debug)]
742pub struct UnspecifiedMetric(());
743
744trait DurationMetric {
748 fn record(&mut self, seconds: f64);
749}
750
751impl DurationMetric for prometheus::Histogram {
752 fn record(&mut self, seconds: f64) {
753 self.observe(seconds)
754 }
755}
756
757impl DurationMetric for prometheus::Counter {
758 fn record(&mut self, seconds: f64) {
759 self.inc_by(seconds)
760 }
761}
762
763impl DurationMetric for &'_ mut f64 {
766 fn record(&mut self, seconds: f64) {
767 **self = seconds;
768 }
769}
770
771#[cfg(feature = "async")]
773pub fn register_runtime_metrics(
774 name: &'static str,
775 runtime_metrics: tokio::runtime::RuntimeMetrics,
776 registry: &MetricsRegistry,
777) {
778 macro_rules! register {
779 ($method:ident, $doc:literal) => {
780 let metrics = runtime_metrics.clone();
781 registry.register_computed_gauge::<prometheus::core::AtomicU64>(
782 crate::metric!(
783 name: concat!("mz_tokio_", stringify!($method)),
784 help: $doc,
785 const_labels: {"runtime" => name},
786 ),
787 move || <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method()),
788 );
789 };
790 }
791
792 register!(
793 num_workers,
794 "The number of worker threads used by the runtime."
795 );
796 register!(
797 num_alive_tasks,
798 "The current number of alive tasks in the runtime."
799 );
800 register!(
801 global_queue_depth,
802 "The number of tasks currently scheduled in the runtime's global queue."
803 );
804 #[cfg(tokio_unstable)]
805 {
806 register!(
807 num_blocking_threads,
808 "The number of additional threads spawned by the runtime."
809 );
810 register!(
811 num_idle_blocking_threads,
812 "The number of idle threads which have spawned by the runtime for spawn_blocking calls."
813 );
814 register!(
815 spawned_tasks_count,
816 "The number of tasks spawned in this runtime since it was created."
817 );
818 register!(
819 remote_schedule_count,
820 "The number of tasks scheduled from outside of the runtime."
821 );
822 register!(
823 budget_forced_yield_count,
824 "The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets."
825 );
826 register!(
827 blocking_queue_depth,
828 "The number of tasks currently scheduled in the blocking thread pool, spawned using spawn_blocking."
829 );
830 }
831}
832
833#[cfg(test)]
834mod tests {
835 use std::time::Duration;
836
837 use prometheus::{CounterVec, HistogramVec};
838
839 use crate::stats::histogram_seconds_buckets;
840
841 use super::{MetricsFutureExt, MetricsRegistry};
842
843 struct Metrics {
844 pub wall_time_hist: HistogramVec,
845 pub wall_time_cnt: CounterVec,
846 pub exec_time_hist: HistogramVec,
847 pub exec_time_cnt: CounterVec,
848 }
849
850 impl Metrics {
851 pub fn register_into(registry: &MetricsRegistry) -> Self {
852 Self {
853 wall_time_hist: registry.register(metric!(
854 name: "wall_time_hist",
855 help: "help",
856 var_labels: ["action"],
857 buckets: histogram_seconds_buckets(0.000_128, 8.0),
858 )),
859 wall_time_cnt: registry.register(metric!(
860 name: "wall_time_cnt",
861 help: "help",
862 var_labels: ["action"],
863 )),
864 exec_time_hist: registry.register(metric!(
865 name: "exec_time_hist",
866 help: "help",
867 var_labels: ["action"],
868 buckets: histogram_seconds_buckets(0.000_128, 8.0),
869 )),
870 exec_time_cnt: registry.register(metric!(
871 name: "exec_time_cnt",
872 help: "help",
873 var_labels: ["action"],
874 )),
875 }
876 }
877 }
878
879 #[crate::test]
880 #[cfg_attr(miri, ignore)] fn smoke_test_metrics_future_ext() {
882 let runtime = tokio::runtime::Builder::new_current_thread()
883 .enable_time()
884 .build()
885 .expect("failed to start runtime");
886 let registry = MetricsRegistry::new();
887 let metrics = Metrics::register_into(®istry);
888
889 let async_sleep_future = async {
891 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
892 };
893 runtime.block_on(
894 async_sleep_future
895 .wall_time()
896 .observe(metrics.wall_time_hist.with_label_values(&["async_sleep_w"]))
897 .exec_time()
898 .observe(metrics.exec_time_hist.with_label_values(&["async_sleep_e"])),
899 );
900
901 let reports = registry.gather();
902
903 let exec_family = reports
904 .iter()
905 .find(|m| m.get_name() == "exec_time_hist")
906 .expect("metric not found");
907 let exec_metric = exec_family.get_metric();
908 assert_eq!(exec_metric.len(), 1);
909 assert_eq!(exec_metric[0].get_label()[0].get_value(), "async_sleep_e");
910
911 let exec_histogram = exec_metric[0].get_histogram();
912 assert_eq!(exec_histogram.get_sample_count(), 1);
913 assert_eq!(exec_histogram.get_bucket()[3].get_cumulative_count(), 1);
916
917 let wall_family = reports
918 .iter()
919 .find(|m| m.get_name() == "wall_time_hist")
920 .expect("metric not found");
921 let wall_metric = wall_family.get_metric();
922 assert_eq!(wall_metric.len(), 1);
923 assert_eq!(wall_metric[0].get_label()[0].get_value(), "async_sleep_w");
924
925 let wall_histogram = wall_metric[0].get_histogram();
926 assert_eq!(wall_histogram.get_sample_count(), 1);
927 assert_eq!(wall_histogram.get_bucket()[12].get_cumulative_count(), 0);
930
931 let registry = MetricsRegistry::new();
933 let metrics = Metrics::register_into(®istry);
934
935 let thread_sleep_future = async {
937 std::thread::sleep(std::time::Duration::from_secs(1));
938 };
939 runtime.block_on(
940 thread_sleep_future
941 .wall_time()
942 .with_filter(|duration| duration < Duration::from_millis(10))
943 .inc_by(metrics.wall_time_cnt.with_label_values(&["thread_sleep_w"]))
944 .exec_time()
945 .inc_by(metrics.exec_time_cnt.with_label_values(&["thread_sleep_e"])),
946 );
947
948 let reports = registry.gather();
949
950 let exec_family = reports
951 .iter()
952 .find(|m| m.get_name() == "exec_time_cnt")
953 .expect("metric not found");
954 let exec_metric = exec_family.get_metric();
955 assert_eq!(exec_metric.len(), 1);
956 assert_eq!(exec_metric[0].get_label()[0].get_value(), "thread_sleep_e");
957
958 let exec_counter = exec_metric[0].get_counter();
959 assert!(exec_counter.get_value() >= 1.0);
961
962 let wall_family = reports
963 .iter()
964 .find(|m| m.get_name() == "wall_time_cnt")
965 .expect("metric not found");
966 let wall_metric = wall_family.get_metric();
967 assert_eq!(wall_metric.len(), 1);
968
969 let wall_counter = wall_metric[0].get_counter();
970 assert_eq!(wall_counter.get_value(), 0.0);
972 }
973}