1use std::fmt;
45use std::fmt::{Debug, Formatter};
46use std::future::Future;
47use std::pin::Pin;
48use std::sync::{Arc, Mutex};
49use std::task::{Context, Poll};
50use std::time::{Duration, Instant};
51
52use derivative::Derivative;
53use pin_project::pin_project;
54use prometheus::core::{
55 Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, Desc, GenericCounter, GenericCounterVec,
56 GenericGauge, GenericGaugeVec,
57};
58use prometheus::proto::MetricFamily;
59use prometheus::{HistogramOpts, Registry};
60
61mod delete_on_drop;
62
63pub use delete_on_drop::*;
64pub use prometheus::Opts as PrometheusOpts;
65
66#[macro_export]
68macro_rules! metric {
69 (
70 name: $name:expr,
71 help: $help:expr
72 $(, subsystem: $subsystem_name:expr)?
73 $(, const_labels: { $($cl_key:expr => $cl_value:expr ),* })?
74 $(, var_labels: [ $($vl_name:expr),* ])?
75 $(, buckets: $bk_name:expr)?
76 $(, visibility: $visibility:expr)?
77 $(, tags: [ $($tag:expr),* $(,)? ])?
78 $(,)?
79 ) => {{
80 let const_labels = (&[
81 $($(
82 ($cl_key.to_string(), $cl_value.to_string()),
83 )*)?
84 ]).into_iter().cloned().collect();
85 let var_labels = vec![
86 $(
87 $($vl_name.into(),)*
88 )?];
89 #[allow(unused_mut)]
90 let mut mk_opts = $crate::metrics::MakeCollectorOpts {
91 opts: $crate::metrics::PrometheusOpts::new($name, $help)
92 $(.subsystem( $subsystem_name ))?
93 .const_labels(const_labels)
94 .variable_labels(var_labels),
95 buckets: None,
96 };
97 $(mk_opts.buckets = Some($bk_name);)*
99 $(let _: $crate::metrics::MetricVisibility = $visibility;)?
104 $($(let _: $crate::metrics::MetricTag = $tag;)*)?
107 mk_opts
108 }}
109}
110
111#[derive(Debug, Clone)]
113pub struct MakeCollectorOpts {
114 pub opts: PrometheusOpts,
116 pub buckets: Option<Vec<f64>>,
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize)]
127#[serde(rename_all = "snake_case")]
128pub enum MetricVisibility {
129 #[default]
131 Internal,
132 Public,
136}
137
138#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
147#[serde(rename_all = "kebab-case")]
148pub enum MetricTag {
149 Environment,
151 Compute,
153 Source,
155 Sink,
157}
158
159#[derive(Clone, Derivative)]
161#[derivative(Debug)]
162pub struct MetricsRegistry {
163 inner: Registry,
164 #[derivative(Debug = "ignore")]
165 postprocessors: Arc<Mutex<Vec<Box<dyn FnMut(&mut Vec<MetricFamily>) + Send + Sync>>>>,
166}
167
168#[derive(Clone)]
176pub struct DeleteOnDropWrapper<M> {
177 inner: M,
178}
179
180impl<M: MakeCollector + Debug> Debug for DeleteOnDropWrapper<M> {
181 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
182 self.inner.fmt(f)
183 }
184}
185
186impl<M: Collector> Collector for DeleteOnDropWrapper<M> {
187 fn desc(&self) -> Vec<&Desc> {
188 self.inner.desc()
189 }
190
191 fn collect(&self) -> Vec<MetricFamily> {
192 self.inner.collect()
193 }
194}
195
196impl<M: MakeCollector> MakeCollector for DeleteOnDropWrapper<M> {
197 fn make_collector(opts: MakeCollectorOpts) -> Self {
198 DeleteOnDropWrapper {
199 inner: M::make_collector(opts),
200 }
201 }
202}
203
204impl<M: MetricVecExt> DeleteOnDropWrapper<M> {
205 pub fn get_delete_on_drop_metric<L: PromLabelsExt>(
207 &self,
208 labels: L,
209 ) -> DeleteOnDropMetric<M, L> {
210 self.inner.get_delete_on_drop_metric(labels)
211 }
212}
213
214pub type UIntGauge = GenericGauge<AtomicU64>;
217
218pub type CounterVec = DeleteOnDropWrapper<prometheus::CounterVec>;
220pub type Gauge = DeleteOnDropWrapper<prometheus::Gauge>;
222pub type GaugeVec = DeleteOnDropWrapper<prometheus::GaugeVec>;
224pub type HistogramVec = DeleteOnDropWrapper<prometheus::HistogramVec>;
226pub type IntCounterVec = DeleteOnDropWrapper<prometheus::IntCounterVec>;
228pub type IntGaugeVec = DeleteOnDropWrapper<prometheus::IntGaugeVec>;
230pub type UIntGaugeVec = DeleteOnDropWrapper<raw::UIntGaugeVec>;
232
233use crate::assert_none;
234
235pub use prometheus::{Counter, Histogram, IntCounter, IntGauge};
236
237pub mod raw {
239 use prometheus::core::{AtomicU64, GenericGaugeVec};
240
241 pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
244
245 pub use prometheus::{CounterVec, Gauge, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
246}
247
248impl MetricsRegistry {
249 pub fn new() -> Self {
251 MetricsRegistry {
252 inner: Registry::new(),
253 postprocessors: Arc::new(Mutex::new(vec![])),
254 }
255 }
256
257 pub fn register<M>(&self, opts: MakeCollectorOpts) -> M
259 where
260 M: MakeCollector,
261 {
262 let collector = M::make_collector(opts);
263 self.inner.register(Box::new(collector.clone())).unwrap();
264 collector
265 }
266
267 pub fn register_computed_gauge<P>(
269 &self,
270 opts: MakeCollectorOpts,
271 f: impl Fn() -> P::T + Send + Sync + 'static,
272 ) -> ComputedGenericGauge<P>
273 where
274 P: Atomic + 'static,
275 {
276 let gauge = ComputedGenericGauge {
277 gauge: GenericGauge::make_collector(opts),
278 f: Arc::new(f),
279 };
280 self.inner.register(Box::new(gauge.clone())).unwrap();
281 gauge
282 }
283
284 pub fn register_collector<C: 'static + prometheus::core::Collector>(&self, collector: C) {
286 self.inner
287 .register(Box::new(collector))
288 .expect("registering pre-defined metrics collector");
289 }
290
291 pub fn register_postprocessor<F>(&self, f: F)
296 where
297 F: FnMut(&mut Vec<MetricFamily>) + Send + Sync + 'static,
298 {
299 let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
300 postprocessors.push(Box::new(f));
301 }
302
303 pub fn gather(&self) -> Vec<MetricFamily> {
311 let mut metrics = self.inner.gather();
312 let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
313 for postprocessor in &mut *postprocessors {
314 postprocessor(&mut metrics);
315 }
316 metrics
317 }
318}
319
320pub trait MakeCollector: Collector + Clone + 'static {
325 fn make_collector(opts: MakeCollectorOpts) -> Self;
327}
328
329impl<T> MakeCollector for GenericCounter<T>
330where
331 T: Atomic + 'static,
332{
333 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
334 assert_none!(mk_opts.buckets);
335 Self::with_opts(mk_opts.opts).expect("defining a counter")
336 }
337}
338
339impl<T> MakeCollector for GenericCounterVec<T>
340where
341 T: Atomic + 'static,
342{
343 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
344 assert_none!(mk_opts.buckets);
345 let labels: Vec<String> = mk_opts.opts.variable_labels.clone();
346 let label_refs: Vec<&str> = labels.iter().map(String::as_str).collect();
347 Self::new(mk_opts.opts, label_refs.as_slice()).expect("defining a counter vec")
348 }
349}
350
351impl<T> MakeCollector for GenericGauge<T>
352where
353 T: Atomic + 'static,
354{
355 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
356 assert_none!(mk_opts.buckets);
357 Self::with_opts(mk_opts.opts).expect("defining a gauge")
358 }
359}
360
361impl<T> MakeCollector for GenericGaugeVec<T>
362where
363 T: Atomic + 'static,
364{
365 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
366 assert_none!(mk_opts.buckets);
367 let labels = mk_opts.opts.variable_labels.clone();
368 let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
369 Self::new(mk_opts.opts, labels).expect("defining a gauge vec")
370 }
371}
372
373impl MakeCollector for Histogram {
374 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
375 assert!(mk_opts.buckets.is_some());
376 Self::with_opts(HistogramOpts {
377 common_opts: mk_opts.opts,
378 buckets: mk_opts.buckets.unwrap(),
379 })
380 .expect("defining a histogram")
381 }
382}
383
384impl MakeCollector for raw::HistogramVec {
385 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
386 assert!(mk_opts.buckets.is_some());
387 let labels = mk_opts.opts.variable_labels.clone();
388 let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
389 Self::new(
390 HistogramOpts {
391 common_opts: mk_opts.opts,
392 buckets: mk_opts.buckets.unwrap(),
393 },
394 labels,
395 )
396 .expect("defining a histogram vec")
397 }
398}
399
400pub struct ComputedGenericGauge<P>
402where
403 P: Atomic,
404{
405 gauge: GenericGauge<P>,
406 f: Arc<dyn Fn() -> P::T + Send + Sync>,
407}
408
409impl<P> fmt::Debug for ComputedGenericGauge<P>
410where
411 P: Atomic + fmt::Debug,
412{
413 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
414 f.debug_struct("ComputedGenericGauge")
415 .field("gauge", &self.gauge)
416 .finish_non_exhaustive()
417 }
418}
419
420impl<P> Clone for ComputedGenericGauge<P>
421where
422 P: Atomic,
423{
424 fn clone(&self) -> ComputedGenericGauge<P> {
425 ComputedGenericGauge {
426 gauge: self.gauge.clone(),
427 f: Arc::clone(&self.f),
428 }
429 }
430}
431
432impl<T> Collector for ComputedGenericGauge<T>
433where
434 T: Atomic,
435{
436 fn desc(&self) -> Vec<&prometheus::core::Desc> {
437 self.gauge.desc()
438 }
439
440 fn collect(&self) -> Vec<MetricFamily> {
441 self.gauge.set((self.f)());
442 self.gauge.collect()
443 }
444}
445
446impl<P> ComputedGenericGauge<P>
447where
448 P: Atomic,
449{
450 pub fn get(&self) -> P::T {
452 (self.f)()
453 }
454}
455
456pub type ComputedGauge = ComputedGenericGauge<AtomicF64>;
458
459pub type ComputedIntGauge = ComputedGenericGauge<AtomicI64>;
461
462pub type ComputedUIntGauge = ComputedGenericGauge<AtomicU64>;
464
465pub trait MetricsFutureExt<F> {
467 fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric>;
502
503 fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric>;
540}
541
542impl<F: Future> MetricsFutureExt<F> for F {
543 fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric> {
544 WallTimeFuture {
545 fut: self,
546 metric: UnspecifiedMetric(()),
547 start: None,
548 filter: None,
549 }
550 }
551
552 fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric> {
553 ExecTimeFuture {
554 fut: self,
555 metric: UnspecifiedMetric(()),
556 running_duration: Duration::from_millis(0),
557 filter: None,
558 }
559 }
560}
561
562#[must_use = "futures do nothing unless you `.await` or poll them"]
564#[pin_project]
565pub struct WallTimeFuture<F, Metric> {
566 #[pin]
568 fut: F,
569 metric: Metric,
571 start: Option<Instant>,
573 filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
575}
576
577impl<F: Debug, M: Debug> fmt::Debug for WallTimeFuture<F, M> {
578 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
579 f.debug_struct("WallTimeFuture")
580 .field("fut", &self.fut)
581 .field("metric", &self.metric)
582 .field("start", &self.start)
583 .field("filter", &self.filter.is_some())
584 .finish()
585 }
586}
587
588impl<F> WallTimeFuture<F, UnspecifiedMetric> {
589 pub fn observe(
597 self,
598 histogram: prometheus::Histogram,
599 ) -> WallTimeFuture<F, prometheus::Histogram> {
600 WallTimeFuture {
601 fut: self.fut,
602 metric: histogram,
603 start: self.start,
604 filter: self.filter,
605 }
606 }
607
608 pub fn inc_by(self, counter: prometheus::Counter) -> WallTimeFuture<F, prometheus::Counter> {
616 WallTimeFuture {
617 fut: self.fut,
618 metric: counter,
619 start: self.start,
620 filter: self.filter,
621 }
622 }
623
624 pub fn set_at(self, place: &mut f64) -> WallTimeFuture<F, &mut f64> {
626 WallTimeFuture {
627 fut: self.fut,
628 metric: place,
629 start: self.start,
630 filter: self.filter,
631 }
632 }
633}
634
635impl<F, M> WallTimeFuture<F, M> {
636 pub fn with_filter(
641 mut self,
642 filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
643 ) -> Self {
644 self.filter = Some(Box::new(filter));
645 self
646 }
647}
648
649impl<F: Future, M: DurationMetric> Future for WallTimeFuture<F, M> {
650 type Output = F::Output;
651
652 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
653 let this = self.project();
654
655 if this.start.is_none() {
656 *this.start = Some(Instant::now());
657 }
658
659 let result = match this.fut.poll(cx) {
660 Poll::Ready(r) => r,
661 Poll::Pending => return Poll::Pending,
662 };
663 let duration = Instant::now().duration_since(this.start.expect("timer to be started"));
664
665 let pass = this
666 .filter
667 .as_mut()
668 .map(|filter| filter(duration))
669 .unwrap_or(true);
670 if pass {
671 this.metric.record(duration.as_secs_f64())
672 }
673
674 Poll::Ready(result)
675 }
676}
677
678#[must_use = "futures do nothing unless you `.await` or poll them"]
680#[pin_project]
681pub struct ExecTimeFuture<F, Metric> {
682 #[pin]
684 fut: F,
685 metric: Metric,
687 running_duration: Duration,
689 filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
691}
692
693impl<F: Debug, M: Debug> fmt::Debug for ExecTimeFuture<F, M> {
694 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
695 f.debug_struct("ExecTimeFuture")
696 .field("fut", &self.fut)
697 .field("metric", &self.metric)
698 .field("running_duration", &self.running_duration)
699 .field("filter", &self.filter.is_some())
700 .finish()
701 }
702}
703
704impl<F> ExecTimeFuture<F, UnspecifiedMetric> {
705 pub fn observe(
713 self,
714 histogram: prometheus::Histogram,
715 ) -> ExecTimeFuture<F, prometheus::Histogram> {
716 ExecTimeFuture {
717 fut: self.fut,
718 metric: histogram,
719 running_duration: self.running_duration,
720 filter: self.filter,
721 }
722 }
723
724 pub fn inc_by(self, counter: prometheus::Counter) -> ExecTimeFuture<F, prometheus::Counter> {
732 ExecTimeFuture {
733 fut: self.fut,
734 metric: counter,
735 running_duration: self.running_duration,
736 filter: self.filter,
737 }
738 }
739}
740
741impl<F, M> ExecTimeFuture<F, M> {
742 pub fn with_filter(
744 mut self,
745 filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
746 ) -> Self {
747 self.filter = Some(Box::new(filter));
748 self
749 }
750}
751
752impl<F: Future, M: DurationMetric> Future for ExecTimeFuture<F, M> {
753 type Output = F::Output;
754
755 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
756 let this = self.project();
757
758 let start = Instant::now();
759 let result = this.fut.poll(cx);
760 let duration = Instant::now().duration_since(start);
761
762 *this.running_duration = this.running_duration.saturating_add(duration);
763
764 let result = match result {
765 Poll::Ready(result) => result,
766 Poll::Pending => return Poll::Pending,
767 };
768
769 let duration = *this.running_duration;
770 let pass = this
771 .filter
772 .as_mut()
773 .map(|filter| filter(duration))
774 .unwrap_or(true);
775 if pass {
776 this.metric.record(duration.as_secs_f64());
777 }
778
779 Poll::Ready(result)
780 }
781}
782
783#[derive(Debug)]
790pub struct UnspecifiedMetric(());
791
792trait DurationMetric {
796 fn record(&mut self, seconds: f64);
797}
798
799impl DurationMetric for prometheus::Histogram {
800 fn record(&mut self, seconds: f64) {
801 self.observe(seconds)
802 }
803}
804
805impl DurationMetric for prometheus::Counter {
806 fn record(&mut self, seconds: f64) {
807 self.inc_by(seconds)
808 }
809}
810
811impl DurationMetric for &'_ mut f64 {
814 fn record(&mut self, seconds: f64) {
815 **self = seconds;
816 }
817}
818
819#[cfg(feature = "async")]
821pub fn register_runtime_metrics(
822 name: &'static str,
823 runtime_metrics: tokio::runtime::RuntimeMetrics,
824 registry: &MetricsRegistry,
825) {
826 macro_rules! register {
827 ($method:ident, $doc:literal) => {
828 let metrics = runtime_metrics.clone();
829 registry.register_computed_gauge::<prometheus::core::AtomicU64>(
830 crate::metric!(
831 name: concat!("mz_tokio_", stringify!($method)),
832 help: $doc,
833 const_labels: {"runtime" => name},
834 ),
835 move || <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method()),
836 );
837 };
838 }
839
840 macro_rules! register_per_worker {
841 ($method:ident, $doc:literal) => {
842 let metrics = runtime_metrics.clone();
843 registry.register_computed_gauge::<prometheus::core::AtomicU64>(
844 crate::metric!(
845 name: concat!("mz_tokio_", stringify!($method)),
846 help: $doc,
847 const_labels: {"runtime" => name},
848 ),
849 move || {
850 (0..metrics.num_workers())
851 .map(|i| <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method(i)))
852 .sum::<u64>()
853 },
854 );
855 };
856 }
857
858 macro_rules! register_per_worker_duration_secs {
859 ($method:ident, $doc:literal) => {
860 let metrics = runtime_metrics.clone();
861 registry.register_computed_gauge::<prometheus::core::AtomicF64>(
862 crate::metric!(
863 name: concat!("mz_tokio_", stringify!($method)),
864 help: $doc,
865 const_labels: {"runtime" => name},
866 ),
867 move || {
868 (0..metrics.num_workers())
869 .map(|i| metrics.$method(i).as_secs_f64())
870 .sum::<f64>()
871 },
872 );
873 };
874 }
875
876 register!(
877 num_workers,
878 "The number of worker threads used by the runtime."
879 );
880 register!(
881 num_alive_tasks,
882 "The current number of alive tasks in the runtime."
883 );
884 register!(
885 global_queue_depth,
886 "The number of tasks currently scheduled in the runtime's global queue."
887 );
888 register_per_worker_duration_secs!(
889 worker_total_busy_duration,
890 "The amount of time the worker threads have been busy, in seconds."
891 );
892 register_per_worker!(
893 worker_park_count,
894 "The total number of times the worker threads have parked."
895 );
896 register_per_worker!(
897 worker_park_unpark_count,
898 "The total number of times the worker threads have parked and unparked."
899 );
900
901 #[cfg(tokio_unstable)]
902 {
903 register!(
904 num_blocking_threads,
905 "The number of additional threads spawned by the runtime."
906 );
907 register!(
908 num_idle_blocking_threads,
909 "The number of idle threads which have spawned by the runtime for spawn_blocking calls."
910 );
911 register_per_worker!(
912 worker_local_queue_depth,
913 "The number of tasks currently scheduled in the workers' local queues."
914 );
915 register!(
916 blocking_queue_depth,
917 "The number of tasks currently scheduled in the blocking thread pool, spawned using spawn_blocking."
918 );
919 register!(
920 spawned_tasks_count,
921 "The number of tasks spawned in this runtime since it was created."
922 );
923 register!(
924 remote_schedule_count,
925 "The number of tasks scheduled from outside of the runtime."
926 );
927 register!(
928 budget_forced_yield_count,
929 "The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets."
930 );
931 register_per_worker!(
932 worker_noop_count,
933 "The number of times the given worker thread unparked but performed no work before parking again."
934 );
935 register_per_worker!(
936 worker_steal_count,
937 "The number of tasks the given worker thread stole from another worker thread."
938 );
939 register_per_worker!(
940 worker_steal_operations,
941 "The number of times the given worker thread stole tasks from another worker thread."
942 );
943 register_per_worker!(
944 worker_poll_count,
945 "The number of tasks the given worker thread has polled."
946 );
947 register_per_worker!(
948 worker_local_schedule_count,
949 "The number of tasks scheduled from within the runtime on the given worker's local queue."
950 );
951 register_per_worker!(
952 worker_overflow_count,
953 "The number of times the given worker thread saturated its local queue."
954 );
955 register_per_worker_duration_secs!(
956 worker_mean_poll_time,
957 "The mean duration of task polls in seconds."
958 );
959 }
960}
961
962#[cfg(feature = "async")]
965pub fn describe_runtime_metrics() -> Vec<(String, String, Vec<String>, &'static str)> {
966 let runtime = tokio::runtime::Builder::new_current_thread()
969 .build()
970 .expect("building a current-thread runtime");
971 let registry = MetricsRegistry::new();
972 register_runtime_metrics("describe", runtime.handle().metrics(), ®istry);
973 registry
974 .gather()
975 .into_iter()
976 .map(|mf| {
977 let mut labels: Vec<String> = mf
980 .get_metric()
981 .first()
982 .map(|m| m.get_label().iter().map(|l| l.name().to_owned()).collect())
983 .unwrap_or_default();
984 labels.sort();
985 labels.dedup();
986 (mf.name().to_owned(), mf.help().to_owned(), labels, file!())
987 })
988 .collect()
989}
990
991#[cfg(test)]
992mod tests {
993 use std::time::Duration;
994
995 use prometheus::{CounterVec, HistogramVec};
996
997 use crate::stats::histogram_seconds_buckets;
998
999 use super::{MetricsFutureExt, MetricsRegistry};
1000
1001 struct Metrics {
1002 pub wall_time_hist: HistogramVec,
1003 pub wall_time_cnt: CounterVec,
1004 pub exec_time_hist: HistogramVec,
1005 pub exec_time_cnt: CounterVec,
1006 }
1007
1008 impl Metrics {
1009 pub fn register_into(registry: &MetricsRegistry) -> Self {
1010 Self {
1011 wall_time_hist: registry.register(metric!(
1012 name: "wall_time_hist",
1013 help: "help",
1014 var_labels: ["action"],
1015 buckets: histogram_seconds_buckets(0.000_128, 8.0),
1016 )),
1017 wall_time_cnt: registry.register(metric!(
1018 name: "wall_time_cnt",
1019 help: "help",
1020 var_labels: ["action"],
1021 )),
1022 exec_time_hist: registry.register(metric!(
1023 name: "exec_time_hist",
1024 help: "help",
1025 var_labels: ["action"],
1026 buckets: histogram_seconds_buckets(0.000_128, 8.0),
1027 )),
1028 exec_time_cnt: registry.register(metric!(
1029 name: "exec_time_cnt",
1030 help: "help",
1031 var_labels: ["action"],
1032 )),
1033 }
1034 }
1035 }
1036
1037 #[crate::test]
1038 #[cfg_attr(miri, ignore)] fn smoke_test_metrics_future_ext() {
1040 let runtime = tokio::runtime::Builder::new_current_thread()
1041 .enable_time()
1042 .build()
1043 .expect("failed to start runtime");
1044 let registry = MetricsRegistry::new();
1045 let metrics = Metrics::register_into(®istry);
1046
1047 let async_sleep_future = async {
1049 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1050 };
1051 runtime.block_on(
1052 async_sleep_future
1053 .wall_time()
1054 .observe(metrics.wall_time_hist.with_label_values(&["async_sleep_w"]))
1055 .exec_time()
1056 .observe(metrics.exec_time_hist.with_label_values(&["async_sleep_e"])),
1057 );
1058
1059 let reports = registry.gather();
1060
1061 let exec_family = reports
1062 .iter()
1063 .find(|m| m.name() == "exec_time_hist")
1064 .expect("metric not found");
1065 let exec_metric = exec_family.get_metric();
1066 assert_eq!(exec_metric.len(), 1);
1067 assert_eq!(exec_metric[0].get_label()[0].value(), "async_sleep_e");
1068
1069 let exec_histogram = exec_metric[0].get_histogram();
1070 assert_eq!(exec_histogram.get_sample_count(), 1);
1071 let wall_family = reports
1075 .iter()
1076 .find(|m| m.name() == "wall_time_hist")
1077 .expect("metric not found");
1078 let wall_metric = wall_family.get_metric();
1079 assert_eq!(wall_metric.len(), 1);
1080 assert_eq!(wall_metric[0].get_label()[0].value(), "async_sleep_w");
1081
1082 let wall_histogram = wall_metric[0].get_histogram();
1083 assert_eq!(wall_histogram.get_sample_count(), 1);
1084 assert_eq!(wall_histogram.get_bucket()[12].cumulative_count(), 0);
1087
1088 let registry = MetricsRegistry::new();
1090 let metrics = Metrics::register_into(®istry);
1091
1092 let thread_sleep_future = async {
1094 std::thread::sleep(std::time::Duration::from_secs(1));
1095 };
1096 runtime.block_on(
1097 thread_sleep_future
1098 .wall_time()
1099 .with_filter(|duration| duration < Duration::from_millis(10))
1100 .inc_by(metrics.wall_time_cnt.with_label_values(&["thread_sleep_w"]))
1101 .exec_time()
1102 .inc_by(metrics.exec_time_cnt.with_label_values(&["thread_sleep_e"])),
1103 );
1104
1105 let reports = registry.gather();
1106
1107 let exec_family = reports
1108 .iter()
1109 .find(|m| m.name() == "exec_time_cnt")
1110 .expect("metric not found");
1111 let exec_metric = exec_family.get_metric();
1112 assert_eq!(exec_metric.len(), 1);
1113 assert_eq!(exec_metric[0].get_label()[0].value(), "thread_sleep_e");
1114
1115 let exec_counter = exec_metric[0].get_counter();
1116 assert!(exec_counter.value() >= 1.0);
1118
1119 let wall_family = reports
1120 .iter()
1121 .find(|m| m.name() == "wall_time_cnt")
1122 .expect("metric not found");
1123 let wall_metric = wall_family.get_metric();
1124 assert_eq!(wall_metric.len(), 1);
1125
1126 let wall_counter = wall_metric[0].get_counter();
1127 assert_eq!(wall_counter.value(), 0.0);
1129 }
1130}