1use std::fmt;
45use std::fmt::{Debug, Formatter};
46use std::future::Future;
47use std::pin::Pin;
48use std::sync::{Arc, Mutex};
49use std::task::{Context, Poll};
50use std::time::{Duration, Instant};
51
52use derivative::Derivative;
53use pin_project::pin_project;
54use prometheus::core::{
55 Atomic, AtomicF64, AtomicI64, AtomicU64, Collector, Desc, GenericCounter, GenericCounterVec,
56 GenericGauge, GenericGaugeVec,
57};
58use prometheus::proto::MetricFamily;
59use prometheus::{HistogramOpts, Registry};
60
61mod delete_on_drop;
62
63pub use delete_on_drop::*;
64pub use prometheus::Opts as PrometheusOpts;
65
66#[macro_export]
68macro_rules! metric {
69 (
70 name: $name:expr,
71 help: $help:expr
72 $(, subsystem: $subsystem_name:expr)?
73 $(, const_labels: { $($cl_key:expr => $cl_value:expr ),* })?
74 $(, var_labels: [ $($vl_name:expr),* ])?
75 $(, buckets: $bk_name:expr)?
76 $(, visibility: $visibility:expr)?
77 $(,)?
78 ) => {{
79 let const_labels = (&[
80 $($(
81 ($cl_key.to_string(), $cl_value.to_string()),
82 )*)?
83 ]).into_iter().cloned().collect();
84 let var_labels = vec![
85 $(
86 $($vl_name.into(),)*
87 )?];
88 #[allow(unused_mut)]
89 let mut mk_opts = $crate::metrics::MakeCollectorOpts {
90 opts: $crate::metrics::PrometheusOpts::new($name, $help)
91 $(.subsystem( $subsystem_name ))?
92 .const_labels(const_labels)
93 .variable_labels(var_labels),
94 buckets: None,
95 };
96 $(mk_opts.buckets = Some($bk_name);)*
98 $(let _: $crate::metrics::MetricVisibility = $visibility;)?
103 mk_opts
104 }}
105}
106
107#[derive(Debug, Clone)]
109pub struct MakeCollectorOpts {
110 pub opts: PrometheusOpts,
112 pub buckets: Option<Vec<f64>>,
115}
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize)]
123#[serde(rename_all = "snake_case")]
124pub enum MetricVisibility {
125 #[default]
127 Internal,
128 Public,
132}
133
134#[derive(Clone, Derivative)]
136#[derivative(Debug)]
137pub struct MetricsRegistry {
138 inner: Registry,
139 #[derivative(Debug = "ignore")]
140 postprocessors: Arc<Mutex<Vec<Box<dyn FnMut(&mut Vec<MetricFamily>) + Send + Sync>>>>,
141}
142
143#[derive(Clone)]
151pub struct DeleteOnDropWrapper<M> {
152 inner: M,
153}
154
155impl<M: MakeCollector + Debug> Debug for DeleteOnDropWrapper<M> {
156 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
157 self.inner.fmt(f)
158 }
159}
160
161impl<M: Collector> Collector for DeleteOnDropWrapper<M> {
162 fn desc(&self) -> Vec<&Desc> {
163 self.inner.desc()
164 }
165
166 fn collect(&self) -> Vec<MetricFamily> {
167 self.inner.collect()
168 }
169}
170
171impl<M: MakeCollector> MakeCollector for DeleteOnDropWrapper<M> {
172 fn make_collector(opts: MakeCollectorOpts) -> Self {
173 DeleteOnDropWrapper {
174 inner: M::make_collector(opts),
175 }
176 }
177}
178
179impl<M: MetricVecExt> DeleteOnDropWrapper<M> {
180 pub fn get_delete_on_drop_metric<L: PromLabelsExt>(
182 &self,
183 labels: L,
184 ) -> DeleteOnDropMetric<M, L> {
185 self.inner.get_delete_on_drop_metric(labels)
186 }
187}
188
189pub type UIntGauge = GenericGauge<AtomicU64>;
192
193pub type CounterVec = DeleteOnDropWrapper<prometheus::CounterVec>;
195pub type Gauge = DeleteOnDropWrapper<prometheus::Gauge>;
197pub type GaugeVec = DeleteOnDropWrapper<prometheus::GaugeVec>;
199pub type HistogramVec = DeleteOnDropWrapper<prometheus::HistogramVec>;
201pub type IntCounterVec = DeleteOnDropWrapper<prometheus::IntCounterVec>;
203pub type IntGaugeVec = DeleteOnDropWrapper<prometheus::IntGaugeVec>;
205pub type UIntGaugeVec = DeleteOnDropWrapper<raw::UIntGaugeVec>;
207
208use crate::assert_none;
209
210pub use prometheus::{Counter, Histogram, IntCounter, IntGauge};
211
212pub mod raw {
214 use prometheus::core::{AtomicU64, GenericGaugeVec};
215
216 pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
219
220 pub use prometheus::{CounterVec, Gauge, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec};
221}
222
223impl MetricsRegistry {
224 pub fn new() -> Self {
226 MetricsRegistry {
227 inner: Registry::new(),
228 postprocessors: Arc::new(Mutex::new(vec![])),
229 }
230 }
231
232 pub fn register<M>(&self, opts: MakeCollectorOpts) -> M
234 where
235 M: MakeCollector,
236 {
237 let collector = M::make_collector(opts);
238 self.inner.register(Box::new(collector.clone())).unwrap();
239 collector
240 }
241
242 pub fn register_computed_gauge<P>(
244 &self,
245 opts: MakeCollectorOpts,
246 f: impl Fn() -> P::T + Send + Sync + 'static,
247 ) -> ComputedGenericGauge<P>
248 where
249 P: Atomic + 'static,
250 {
251 let gauge = ComputedGenericGauge {
252 gauge: GenericGauge::make_collector(opts),
253 f: Arc::new(f),
254 };
255 self.inner.register(Box::new(gauge.clone())).unwrap();
256 gauge
257 }
258
259 pub fn register_collector<C: 'static + prometheus::core::Collector>(&self, collector: C) {
261 self.inner
262 .register(Box::new(collector))
263 .expect("registering pre-defined metrics collector");
264 }
265
266 pub fn register_postprocessor<F>(&self, f: F)
271 where
272 F: FnMut(&mut Vec<MetricFamily>) + Send + Sync + 'static,
273 {
274 let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
275 postprocessors.push(Box::new(f));
276 }
277
278 pub fn gather(&self) -> Vec<MetricFamily> {
286 let mut metrics = self.inner.gather();
287 let mut postprocessors = self.postprocessors.lock().expect("lock poisoned");
288 for postprocessor in &mut *postprocessors {
289 postprocessor(&mut metrics);
290 }
291 metrics
292 }
293}
294
295pub trait MakeCollector: Collector + Clone + 'static {
300 fn make_collector(opts: MakeCollectorOpts) -> Self;
302}
303
304impl<T> MakeCollector for GenericCounter<T>
305where
306 T: Atomic + 'static,
307{
308 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
309 assert_none!(mk_opts.buckets);
310 Self::with_opts(mk_opts.opts).expect("defining a counter")
311 }
312}
313
314impl<T> MakeCollector for GenericCounterVec<T>
315where
316 T: Atomic + 'static,
317{
318 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
319 assert_none!(mk_opts.buckets);
320 let labels: Vec<String> = mk_opts.opts.variable_labels.clone();
321 let label_refs: Vec<&str> = labels.iter().map(String::as_str).collect();
322 Self::new(mk_opts.opts, label_refs.as_slice()).expect("defining a counter vec")
323 }
324}
325
326impl<T> MakeCollector for GenericGauge<T>
327where
328 T: Atomic + 'static,
329{
330 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
331 assert_none!(mk_opts.buckets);
332 Self::with_opts(mk_opts.opts).expect("defining a gauge")
333 }
334}
335
336impl<T> MakeCollector for GenericGaugeVec<T>
337where
338 T: Atomic + 'static,
339{
340 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
341 assert_none!(mk_opts.buckets);
342 let labels = mk_opts.opts.variable_labels.clone();
343 let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
344 Self::new(mk_opts.opts, labels).expect("defining a gauge vec")
345 }
346}
347
348impl MakeCollector for Histogram {
349 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
350 assert!(mk_opts.buckets.is_some());
351 Self::with_opts(HistogramOpts {
352 common_opts: mk_opts.opts,
353 buckets: mk_opts.buckets.unwrap(),
354 })
355 .expect("defining a histogram")
356 }
357}
358
359impl MakeCollector for raw::HistogramVec {
360 fn make_collector(mk_opts: MakeCollectorOpts) -> Self {
361 assert!(mk_opts.buckets.is_some());
362 let labels = mk_opts.opts.variable_labels.clone();
363 let labels = &labels.iter().map(|x| x.as_str()).collect::<Vec<_>>();
364 Self::new(
365 HistogramOpts {
366 common_opts: mk_opts.opts,
367 buckets: mk_opts.buckets.unwrap(),
368 },
369 labels,
370 )
371 .expect("defining a histogram vec")
372 }
373}
374
375pub struct ComputedGenericGauge<P>
377where
378 P: Atomic,
379{
380 gauge: GenericGauge<P>,
381 f: Arc<dyn Fn() -> P::T + Send + Sync>,
382}
383
384impl<P> fmt::Debug for ComputedGenericGauge<P>
385where
386 P: Atomic + fmt::Debug,
387{
388 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
389 f.debug_struct("ComputedGenericGauge")
390 .field("gauge", &self.gauge)
391 .finish_non_exhaustive()
392 }
393}
394
395impl<P> Clone for ComputedGenericGauge<P>
396where
397 P: Atomic,
398{
399 fn clone(&self) -> ComputedGenericGauge<P> {
400 ComputedGenericGauge {
401 gauge: self.gauge.clone(),
402 f: Arc::clone(&self.f),
403 }
404 }
405}
406
407impl<T> Collector for ComputedGenericGauge<T>
408where
409 T: Atomic,
410{
411 fn desc(&self) -> Vec<&prometheus::core::Desc> {
412 self.gauge.desc()
413 }
414
415 fn collect(&self) -> Vec<MetricFamily> {
416 self.gauge.set((self.f)());
417 self.gauge.collect()
418 }
419}
420
421impl<P> ComputedGenericGauge<P>
422where
423 P: Atomic,
424{
425 pub fn get(&self) -> P::T {
427 (self.f)()
428 }
429}
430
431pub type ComputedGauge = ComputedGenericGauge<AtomicF64>;
433
434pub type ComputedIntGauge = ComputedGenericGauge<AtomicI64>;
436
437pub type ComputedUIntGauge = ComputedGenericGauge<AtomicU64>;
439
440pub trait MetricsFutureExt<F> {
442 fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric>;
477
478 fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric>;
515}
516
517impl<F: Future> MetricsFutureExt<F> for F {
518 fn wall_time(self) -> WallTimeFuture<F, UnspecifiedMetric> {
519 WallTimeFuture {
520 fut: self,
521 metric: UnspecifiedMetric(()),
522 start: None,
523 filter: None,
524 }
525 }
526
527 fn exec_time(self) -> ExecTimeFuture<F, UnspecifiedMetric> {
528 ExecTimeFuture {
529 fut: self,
530 metric: UnspecifiedMetric(()),
531 running_duration: Duration::from_millis(0),
532 filter: None,
533 }
534 }
535}
536
537#[must_use = "futures do nothing unless you `.await` or poll them"]
539#[pin_project]
540pub struct WallTimeFuture<F, Metric> {
541 #[pin]
543 fut: F,
544 metric: Metric,
546 start: Option<Instant>,
548 filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
550}
551
552impl<F: Debug, M: Debug> fmt::Debug for WallTimeFuture<F, M> {
553 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
554 f.debug_struct("WallTimeFuture")
555 .field("fut", &self.fut)
556 .field("metric", &self.metric)
557 .field("start", &self.start)
558 .field("filter", &self.filter.is_some())
559 .finish()
560 }
561}
562
563impl<F> WallTimeFuture<F, UnspecifiedMetric> {
564 pub fn observe(
572 self,
573 histogram: prometheus::Histogram,
574 ) -> WallTimeFuture<F, prometheus::Histogram> {
575 WallTimeFuture {
576 fut: self.fut,
577 metric: histogram,
578 start: self.start,
579 filter: self.filter,
580 }
581 }
582
583 pub fn inc_by(self, counter: prometheus::Counter) -> WallTimeFuture<F, prometheus::Counter> {
591 WallTimeFuture {
592 fut: self.fut,
593 metric: counter,
594 start: self.start,
595 filter: self.filter,
596 }
597 }
598
599 pub fn set_at(self, place: &mut f64) -> WallTimeFuture<F, &mut f64> {
601 WallTimeFuture {
602 fut: self.fut,
603 metric: place,
604 start: self.start,
605 filter: self.filter,
606 }
607 }
608}
609
610impl<F, M> WallTimeFuture<F, M> {
611 pub fn with_filter(
616 mut self,
617 filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
618 ) -> Self {
619 self.filter = Some(Box::new(filter));
620 self
621 }
622}
623
624impl<F: Future, M: DurationMetric> Future for WallTimeFuture<F, M> {
625 type Output = F::Output;
626
627 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
628 let this = self.project();
629
630 if this.start.is_none() {
631 *this.start = Some(Instant::now());
632 }
633
634 let result = match this.fut.poll(cx) {
635 Poll::Ready(r) => r,
636 Poll::Pending => return Poll::Pending,
637 };
638 let duration = Instant::now().duration_since(this.start.expect("timer to be started"));
639
640 let pass = this
641 .filter
642 .as_mut()
643 .map(|filter| filter(duration))
644 .unwrap_or(true);
645 if pass {
646 this.metric.record(duration.as_secs_f64())
647 }
648
649 Poll::Ready(result)
650 }
651}
652
653#[must_use = "futures do nothing unless you `.await` or poll them"]
655#[pin_project]
656pub struct ExecTimeFuture<F, Metric> {
657 #[pin]
659 fut: F,
660 metric: Metric,
662 running_duration: Duration,
664 filter: Option<Box<dyn FnMut(Duration) -> bool + Send + Sync>>,
666}
667
668impl<F: Debug, M: Debug> fmt::Debug for ExecTimeFuture<F, M> {
669 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
670 f.debug_struct("ExecTimeFuture")
671 .field("fut", &self.fut)
672 .field("metric", &self.metric)
673 .field("running_duration", &self.running_duration)
674 .field("filter", &self.filter.is_some())
675 .finish()
676 }
677}
678
679impl<F> ExecTimeFuture<F, UnspecifiedMetric> {
680 pub fn observe(
688 self,
689 histogram: prometheus::Histogram,
690 ) -> ExecTimeFuture<F, prometheus::Histogram> {
691 ExecTimeFuture {
692 fut: self.fut,
693 metric: histogram,
694 running_duration: self.running_duration,
695 filter: self.filter,
696 }
697 }
698
699 pub fn inc_by(self, counter: prometheus::Counter) -> ExecTimeFuture<F, prometheus::Counter> {
707 ExecTimeFuture {
708 fut: self.fut,
709 metric: counter,
710 running_duration: self.running_duration,
711 filter: self.filter,
712 }
713 }
714}
715
716impl<F, M> ExecTimeFuture<F, M> {
717 pub fn with_filter(
719 mut self,
720 filter: impl FnMut(Duration) -> bool + Send + Sync + 'static,
721 ) -> Self {
722 self.filter = Some(Box::new(filter));
723 self
724 }
725}
726
727impl<F: Future, M: DurationMetric> Future for ExecTimeFuture<F, M> {
728 type Output = F::Output;
729
730 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
731 let this = self.project();
732
733 let start = Instant::now();
734 let result = this.fut.poll(cx);
735 let duration = Instant::now().duration_since(start);
736
737 *this.running_duration = this.running_duration.saturating_add(duration);
738
739 let result = match result {
740 Poll::Ready(result) => result,
741 Poll::Pending => return Poll::Pending,
742 };
743
744 let duration = *this.running_duration;
745 let pass = this
746 .filter
747 .as_mut()
748 .map(|filter| filter(duration))
749 .unwrap_or(true);
750 if pass {
751 this.metric.record(duration.as_secs_f64());
752 }
753
754 Poll::Ready(result)
755 }
756}
757
758#[derive(Debug)]
765pub struct UnspecifiedMetric(());
766
767trait DurationMetric {
771 fn record(&mut self, seconds: f64);
772}
773
774impl DurationMetric for prometheus::Histogram {
775 fn record(&mut self, seconds: f64) {
776 self.observe(seconds)
777 }
778}
779
780impl DurationMetric for prometheus::Counter {
781 fn record(&mut self, seconds: f64) {
782 self.inc_by(seconds)
783 }
784}
785
786impl DurationMetric for &'_ mut f64 {
789 fn record(&mut self, seconds: f64) {
790 **self = seconds;
791 }
792}
793
794#[cfg(feature = "async")]
796pub fn register_runtime_metrics(
797 name: &'static str,
798 runtime_metrics: tokio::runtime::RuntimeMetrics,
799 registry: &MetricsRegistry,
800) {
801 macro_rules! register {
802 ($method:ident, $doc:literal) => {
803 let metrics = runtime_metrics.clone();
804 registry.register_computed_gauge::<prometheus::core::AtomicU64>(
805 crate::metric!(
806 name: concat!("mz_tokio_", stringify!($method)),
807 help: $doc,
808 const_labels: {"runtime" => name},
809 ),
810 move || <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method()),
811 );
812 };
813 }
814
815 macro_rules! register_per_worker {
816 ($method:ident, $doc:literal) => {
817 let metrics = runtime_metrics.clone();
818 registry.register_computed_gauge::<prometheus::core::AtomicU64>(
819 crate::metric!(
820 name: concat!("mz_tokio_", stringify!($method)),
821 help: $doc,
822 const_labels: {"runtime" => name},
823 ),
824 move || {
825 (0..metrics.num_workers())
826 .map(|i| <u64 as crate::cast::CastFrom<_>>::cast_from(metrics.$method(i)))
827 .sum::<u64>()
828 },
829 );
830 };
831 }
832
833 macro_rules! register_per_worker_duration_secs {
834 ($method:ident, $doc:literal) => {
835 let metrics = runtime_metrics.clone();
836 registry.register_computed_gauge::<prometheus::core::AtomicF64>(
837 crate::metric!(
838 name: concat!("mz_tokio_", stringify!($method)),
839 help: $doc,
840 const_labels: {"runtime" => name},
841 ),
842 move || {
843 (0..metrics.num_workers())
844 .map(|i| metrics.$method(i).as_secs_f64())
845 .sum::<f64>()
846 },
847 );
848 };
849 }
850
851 register!(
852 num_workers,
853 "The number of worker threads used by the runtime."
854 );
855 register!(
856 num_alive_tasks,
857 "The current number of alive tasks in the runtime."
858 );
859 register!(
860 global_queue_depth,
861 "The number of tasks currently scheduled in the runtime's global queue."
862 );
863 register_per_worker_duration_secs!(
864 worker_total_busy_duration,
865 "The amount of time the worker threads have been busy, in seconds."
866 );
867 register_per_worker!(
868 worker_park_count,
869 "The total number of times the worker threads have parked."
870 );
871 register_per_worker!(
872 worker_park_unpark_count,
873 "The total number of times the worker threads have parked and unparked."
874 );
875
876 #[cfg(tokio_unstable)]
877 {
878 register!(
879 num_blocking_threads,
880 "The number of additional threads spawned by the runtime."
881 );
882 register!(
883 num_idle_blocking_threads,
884 "The number of idle threads which have spawned by the runtime for spawn_blocking calls."
885 );
886 register_per_worker!(
887 worker_local_queue_depth,
888 "The number of tasks currently scheduled in the workers' local queues."
889 );
890 register!(
891 blocking_queue_depth,
892 "The number of tasks currently scheduled in the blocking thread pool, spawned using spawn_blocking."
893 );
894 register!(
895 spawned_tasks_count,
896 "The number of tasks spawned in this runtime since it was created."
897 );
898 register!(
899 remote_schedule_count,
900 "The number of tasks scheduled from outside of the runtime."
901 );
902 register!(
903 budget_forced_yield_count,
904 "The number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets."
905 );
906 register_per_worker!(
907 worker_noop_count,
908 "The number of times the given worker thread unparked but performed no work before parking again."
909 );
910 register_per_worker!(
911 worker_steal_count,
912 "The number of tasks the given worker thread stole from another worker thread."
913 );
914 register_per_worker!(
915 worker_steal_operations,
916 "The number of times the given worker thread stole tasks from another worker thread."
917 );
918 register_per_worker!(
919 worker_poll_count,
920 "The number of tasks the given worker thread has polled."
921 );
922 register_per_worker!(
923 worker_local_schedule_count,
924 "The number of tasks scheduled from within the runtime on the given worker's local queue."
925 );
926 register_per_worker!(
927 worker_overflow_count,
928 "The number of times the given worker thread saturated its local queue."
929 );
930 register_per_worker_duration_secs!(
931 worker_mean_poll_time,
932 "The mean duration of task polls in seconds."
933 );
934 }
935}
936
937#[cfg(feature = "async")]
940pub fn describe_runtime_metrics() -> Vec<(String, String, &'static str)> {
941 let runtime = tokio::runtime::Builder::new_current_thread()
944 .build()
945 .expect("building a current-thread runtime");
946 let registry = MetricsRegistry::new();
947 register_runtime_metrics("describe", runtime.handle().metrics(), ®istry);
948 registry
949 .gather()
950 .into_iter()
951 .map(|mf| (mf.name().to_owned(), mf.help().to_owned(), file!()))
952 .collect()
953}
954
955#[cfg(test)]
956mod tests {
957 use std::time::Duration;
958
959 use prometheus::{CounterVec, HistogramVec};
960
961 use crate::stats::histogram_seconds_buckets;
962
963 use super::{MetricsFutureExt, MetricsRegistry};
964
965 struct Metrics {
966 pub wall_time_hist: HistogramVec,
967 pub wall_time_cnt: CounterVec,
968 pub exec_time_hist: HistogramVec,
969 pub exec_time_cnt: CounterVec,
970 }
971
972 impl Metrics {
973 pub fn register_into(registry: &MetricsRegistry) -> Self {
974 Self {
975 wall_time_hist: registry.register(metric!(
976 name: "wall_time_hist",
977 help: "help",
978 var_labels: ["action"],
979 buckets: histogram_seconds_buckets(0.000_128, 8.0),
980 )),
981 wall_time_cnt: registry.register(metric!(
982 name: "wall_time_cnt",
983 help: "help",
984 var_labels: ["action"],
985 )),
986 exec_time_hist: registry.register(metric!(
987 name: "exec_time_hist",
988 help: "help",
989 var_labels: ["action"],
990 buckets: histogram_seconds_buckets(0.000_128, 8.0),
991 )),
992 exec_time_cnt: registry.register(metric!(
993 name: "exec_time_cnt",
994 help: "help",
995 var_labels: ["action"],
996 )),
997 }
998 }
999 }
1000
1001 #[crate::test]
1002 #[cfg_attr(miri, ignore)] fn smoke_test_metrics_future_ext() {
1004 let runtime = tokio::runtime::Builder::new_current_thread()
1005 .enable_time()
1006 .build()
1007 .expect("failed to start runtime");
1008 let registry = MetricsRegistry::new();
1009 let metrics = Metrics::register_into(®istry);
1010
1011 let async_sleep_future = async {
1013 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
1014 };
1015 runtime.block_on(
1016 async_sleep_future
1017 .wall_time()
1018 .observe(metrics.wall_time_hist.with_label_values(&["async_sleep_w"]))
1019 .exec_time()
1020 .observe(metrics.exec_time_hist.with_label_values(&["async_sleep_e"])),
1021 );
1022
1023 let reports = registry.gather();
1024
1025 let exec_family = reports
1026 .iter()
1027 .find(|m| m.name() == "exec_time_hist")
1028 .expect("metric not found");
1029 let exec_metric = exec_family.get_metric();
1030 assert_eq!(exec_metric.len(), 1);
1031 assert_eq!(exec_metric[0].get_label()[0].value(), "async_sleep_e");
1032
1033 let exec_histogram = exec_metric[0].get_histogram();
1034 assert_eq!(exec_histogram.get_sample_count(), 1);
1035 let wall_family = reports
1039 .iter()
1040 .find(|m| m.name() == "wall_time_hist")
1041 .expect("metric not found");
1042 let wall_metric = wall_family.get_metric();
1043 assert_eq!(wall_metric.len(), 1);
1044 assert_eq!(wall_metric[0].get_label()[0].value(), "async_sleep_w");
1045
1046 let wall_histogram = wall_metric[0].get_histogram();
1047 assert_eq!(wall_histogram.get_sample_count(), 1);
1048 assert_eq!(wall_histogram.get_bucket()[12].cumulative_count(), 0);
1051
1052 let registry = MetricsRegistry::new();
1054 let metrics = Metrics::register_into(®istry);
1055
1056 let thread_sleep_future = async {
1058 std::thread::sleep(std::time::Duration::from_secs(1));
1059 };
1060 runtime.block_on(
1061 thread_sleep_future
1062 .wall_time()
1063 .with_filter(|duration| duration < Duration::from_millis(10))
1064 .inc_by(metrics.wall_time_cnt.with_label_values(&["thread_sleep_w"]))
1065 .exec_time()
1066 .inc_by(metrics.exec_time_cnt.with_label_values(&["thread_sleep_e"])),
1067 );
1068
1069 let reports = registry.gather();
1070
1071 let exec_family = reports
1072 .iter()
1073 .find(|m| m.name() == "exec_time_cnt")
1074 .expect("metric not found");
1075 let exec_metric = exec_family.get_metric();
1076 assert_eq!(exec_metric.len(), 1);
1077 assert_eq!(exec_metric[0].get_label()[0].value(), "thread_sleep_e");
1078
1079 let exec_counter = exec_metric[0].get_counter();
1080 assert!(exec_counter.value() >= 1.0);
1082
1083 let wall_family = reports
1084 .iter()
1085 .find(|m| m.name() == "wall_time_cnt")
1086 .expect("metric not found");
1087 let wall_metric = wall_family.get_metric();
1088 assert_eq!(wall_metric.len(), 1);
1089
1090 let wall_counter = wall_metric[0].get_counter();
1091 assert_eq!(wall_counter.value(), 0.0);
1093 }
1094}