Skip to main content

mz_persist_client/internal/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Prometheus monitoring metrics.
11
12use async_stream::stream;
13use mz_persist_types::stats::PartStatsMetrics;
14use std::collections::BTreeMap;
15use std::sync::{Arc, Mutex, Weak};
16use std::time::{Duration, Instant};
17use tokio::sync::{OnceCell, OwnedSemaphorePermit, Semaphore};
18
19use async_trait::async_trait;
20use bytes::Bytes;
21use futures_util::StreamExt;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::{CastFrom, CastLossy};
24use mz_ore::instrument;
25use mz_ore::metric;
26use mz_ore::metrics::{
27    ComputedGauge, ComputedIntGauge, ComputedUIntGauge, Counter, DeleteOnDropCounter,
28    DeleteOnDropGauge, IntCounter, MakeCollector, MetricVecExt, MetricsRegistry, UIntGauge,
29    UIntGaugeVec, raw,
30};
31use mz_ore::stats::histogram_seconds_buckets;
32use mz_persist::location::{
33    Blob, BlobMetadata, CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData,
34};
35use mz_persist::metrics::{ColumnarMetrics, S3BlobMetrics};
36use mz_persist::retry::RetryStream;
37use mz_persist_types::Codec64;
38use mz_postgres_client::metrics::PostgresClientMetrics;
39use prometheus::core::{AtomicI64, AtomicU64, Collector, Desc, GenericGauge};
40use prometheus::proto::MetricFamily;
41use prometheus::{CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounterVec};
42use timely::progress::Antichain;
43use tokio_metrics::TaskMonitor;
44use tracing::{Instrument, debug, info, info_span};
45
46use crate::fetch::{FETCH_SEMAPHORE_COST_ADJUSTMENT, FETCH_SEMAPHORE_PERMIT_ADJUSTMENT};
47use crate::internal::paths::BlobKey;
48use crate::{PersistConfig, ShardId};
49
50/// Prometheus monitoring metrics.
51///
52/// Intentionally not Clone because we expect this to be passed around in an
53/// Arc.
54pub struct Metrics {
55    _vecs: MetricsVecs,
56    _uptime: ComputedGauge,
57
58    /// Metrics for [Blob] usage.
59    pub blob: BlobMetrics,
60    /// Metrics for [Consensus] usage.
61    pub consensus: ConsensusMetrics,
62    /// Metrics of command evaluation.
63    pub cmds: CmdsMetrics,
64    /// Metrics for each retry loop.
65    pub retries: RetriesMetrics,
66    /// Metrics for batches written directly on behalf of a user (BatchBuilder
67    /// or one of the sugar methods that use it).
68    pub user: BatchWriteMetrics,
69    /// Metrics for reading batch parts
70    pub read: BatchPartReadMetrics,
71    /// Metrics for compaction.
72    pub compaction: CompactionMetrics,
73    /// Metrics for garbage collection.
74    pub gc: GcMetrics,
75    /// Metrics for leasing and automatic lease expiry.
76    pub lease: LeaseMetrics,
77    /// Metrics for various encodings and decodings.
78    pub codecs: CodecsMetrics,
79    /// Metrics for (incremental) state updates and fetches.
80    pub state: StateMetrics,
81    /// Metrics for various per-shard measurements.
82    pub shards: ShardsMetrics,
83    /// Metrics for auditing persist usage
84    pub audit: UsageAuditMetrics,
85    /// Metrics for locking.
86    pub locks: LocksMetrics,
87    /// Metrics for StateWatch.
88    pub watch: WatchMetrics,
89    /// Metrics for PubSub client.
90    pub pubsub_client: PubSubClientMetrics,
91    /// Metrics for mfp/filter pushdown.
92    pub pushdown: PushdownMetrics,
93    /// Metrics for consolidation.
94    pub consolidation: ConsolidationMetrics,
95    /// Metrics for blob caching.
96    pub blob_cache_mem: BlobMemCache,
97    /// Metrics for tokio tasks.
98    pub tasks: TasksMetrics,
99    /// Metrics for columnar data encoding and decoding.
100    pub columnar: ColumnarMetrics,
101    /// Metrics for schemas and the schema registry.
102    pub schema: SchemaMetrics,
103    /// Metrics for inline writes.
104    pub inline: InlineMetrics,
105    /// Semaphore to limit memory/disk use by fetches.
106    pub(crate) semaphore: SemaphoreMetrics,
107
108    /// Metrics for the persist sink.
109    pub sink: SinkMetrics,
110
111    /// Metrics for S3-backed blob implementation
112    pub s3_blob: S3BlobMetrics,
113    /// Metrics for Postgres-backed consensus implementation
114    pub postgres_consensus: PostgresClientMetrics,
115
116    #[allow(dead_code)]
117    pub(crate) registry: MetricsRegistry,
118}
119
120impl std::fmt::Debug for Metrics {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        f.debug_struct("Metrics").finish_non_exhaustive()
123    }
124}
125
126impl Metrics {
127    /// Returns a new [Metrics] instance connected to the given registry.
128    pub fn new(cfg: &PersistConfig, registry: &MetricsRegistry) -> Self {
129        let vecs = MetricsVecs::new(registry);
130        let start = Instant::now();
131        let uptime = registry.register_computed_gauge(
132            metric!(
133                name: "mz_persist_metadata_seconds",
134                help: "server uptime, labels are build metadata",
135                const_labels: {
136                    "version" => cfg.build_version,
137                    "build_type" => if cfg!(release) { "release" } else { "debug" }
138                },
139            ),
140            move || start.elapsed().as_secs_f64(),
141        );
142        let s3_blob = S3BlobMetrics::new(registry);
143        let columnar = ColumnarMetrics::new(
144            registry,
145            &s3_blob.lgbytes,
146            Arc::clone(&cfg.configs),
147            cfg.is_cc_active,
148        );
149        Metrics {
150            blob: vecs.blob_metrics(),
151            consensus: vecs.consensus_metrics(),
152            cmds: vecs.cmds_metrics(registry),
153            retries: vecs.retries_metrics(),
154            codecs: vecs.codecs_metrics(),
155            user: BatchWriteMetrics::new(registry, "user"),
156            read: vecs.batch_part_read_metrics(),
157            compaction: CompactionMetrics::new(registry),
158            gc: GcMetrics::new(registry),
159            lease: LeaseMetrics::new(registry),
160            state: StateMetrics::new(registry),
161            shards: ShardsMetrics::new(registry),
162            audit: UsageAuditMetrics::new(registry),
163            locks: vecs.locks_metrics(),
164            watch: WatchMetrics::new(registry),
165            pubsub_client: PubSubClientMetrics::new(registry),
166            pushdown: PushdownMetrics::new(registry),
167            consolidation: ConsolidationMetrics::new(registry),
168            blob_cache_mem: BlobMemCache::new(registry),
169            tasks: TasksMetrics::new(registry),
170            columnar,
171            schema: SchemaMetrics::new(registry),
172            inline: InlineMetrics::new(registry),
173            semaphore: SemaphoreMetrics::new(cfg.clone(), registry.clone()),
174            sink: SinkMetrics::new(registry),
175            s3_blob,
176            postgres_consensus: PostgresClientMetrics::new(registry, "mz_persist"),
177            _vecs: vecs,
178            _uptime: uptime,
179            registry: registry.clone(),
180        }
181    }
182
183    /// Returns the current lifetime write amplification reflected in these
184    /// metrics.
185    ///
186    /// Only exposed for tests, persistcli, and benchmarks.
187    pub fn write_amplification(&self) -> f64 {
188        // This intentionally uses "bytes" for total and "goodbytes" for user so
189        // that the overhead of our blob format is included.
190        let total_written = self.blob.set.bytes.get();
191        let user_written = self.user.goodbytes.get();
192        #[allow(clippy::as_conversions)]
193        {
194            total_written as f64 / user_written as f64
195        }
196    }
197}
198
199#[derive(Debug)]
200struct MetricsVecs {
201    cmd_started: IntCounterVec,
202    cmd_cas_mismatch: IntCounterVec,
203    cmd_succeeded: IntCounterVec,
204    cmd_failed: IntCounterVec,
205    cmd_seconds: CounterVec,
206
207    external_op_started: IntCounterVec,
208    external_op_succeeded: IntCounterVec,
209    external_op_failed: IntCounterVec,
210    external_op_bytes: IntCounterVec,
211    external_op_seconds: CounterVec,
212    external_consensus_truncated_count: IntCounter,
213    external_blob_delete_noop_count: IntCounter,
214    external_blob_sizes: Histogram,
215    external_rtt_latency: GaugeVec,
216    external_op_latency: HistogramVec,
217
218    retry_started: IntCounterVec,
219    retry_finished: IntCounterVec,
220    retry_retries: IntCounterVec,
221    retry_sleep_seconds: CounterVec,
222
223    encode_count: IntCounterVec,
224    encode_seconds: CounterVec,
225    decode_count: IntCounterVec,
226    decode_seconds: CounterVec,
227
228    read_part_bytes: IntCounterVec,
229    read_part_goodbytes: IntCounterVec,
230    read_part_count: IntCounterVec,
231    read_part_seconds: CounterVec,
232    read_ts_rewrite: IntCounterVec,
233
234    lock_acquire_count: IntCounterVec,
235    lock_blocking_acquire_count: IntCounterVec,
236    lock_blocking_seconds: CounterVec,
237
238    /// A minimal set of metrics imported into honeycomb for alerting.
239    alerts_metrics: Arc<AlertsMetrics>,
240}
241
242impl MetricsVecs {
243    fn new(registry: &MetricsRegistry) -> Self {
244        MetricsVecs {
245            cmd_started: registry.register(metric!(
246                name: "mz_persist_cmd_started_count",
247                help: "count of commands started",
248                var_labels: ["cmd"],
249            )),
250            cmd_cas_mismatch: registry.register(metric!(
251                name: "mz_persist_cmd_cas_mismatch_count",
252                help: "count of command retries from CaS mismatch",
253                var_labels: ["cmd"],
254            )),
255            cmd_succeeded: registry.register(metric!(
256                name: "mz_persist_cmd_succeeded_count",
257                help: "count of commands succeeded",
258                var_labels: ["cmd"],
259            )),
260            cmd_failed: registry.register(metric!(
261                name: "mz_persist_cmd_failed_count",
262                help: "count of commands failed",
263                var_labels: ["cmd"],
264            )),
265            cmd_seconds: registry.register(metric!(
266                name: "mz_persist_cmd_seconds",
267                help: "time spent applying commands",
268                var_labels: ["cmd"],
269            )),
270
271            external_op_started: registry.register(metric!(
272                name: "mz_persist_external_started_count",
273                help: "count of external service calls started",
274                var_labels: ["op"],
275            )),
276            external_op_succeeded: registry.register(metric!(
277                name: "mz_persist_external_succeeded_count",
278                help: "count of external service calls succeeded",
279                var_labels: ["op"],
280            )),
281            external_op_failed: registry.register(metric!(
282                name: "mz_persist_external_failed_count",
283                help: "count of external service calls failed",
284                var_labels: ["op"],
285            )),
286            external_op_bytes: registry.register(metric!(
287                name: "mz_persist_external_bytes_count",
288                help: "total size represented by external service calls",
289                var_labels: ["op"],
290            )),
291            external_op_seconds: registry.register(metric!(
292                name: "mz_persist_external_seconds",
293                help: "time spent in external service calls",
294                var_labels: ["op"],
295            )),
296            external_consensus_truncated_count: registry.register(metric!(
297                name: "mz_persist_external_consensus_truncated_count",
298                help: "count of versions deleted by consensus truncate calls",
299            )),
300            external_blob_delete_noop_count: registry.register(metric!(
301                name: "mz_persist_external_blob_delete_noop_count",
302                help: "count of blob delete calls that deleted a non-existent key",
303            )),
304            external_blob_sizes: registry.register(metric!(
305                name: "mz_persist_external_blob_sizes",
306                help: "histogram of blob sizes at put time",
307                buckets: mz_ore::stats::HISTOGRAM_BYTE_BUCKETS.to_vec(),
308            )),
309            external_rtt_latency: registry.register(metric!(
310                name: "mz_persist_external_rtt_latency",
311                help: "roundtrip-time to external service as seen by this process",
312                var_labels: ["external"],
313            )),
314            external_op_latency: registry.register(metric!(
315                name: "mz_persist_external_op_latency",
316                help: "rountrip latency observed by individual performance-critical operations",
317                var_labels: ["op"],
318                // NB: If we end up overrunning metrics quotas, we could plausibly cut this
319                // down by switching to a factor of 4 between buckets (vs. the standard 2).
320                buckets: histogram_seconds_buckets(0.000_500, 32.0),
321            )),
322
323            retry_started: registry.register(metric!(
324                name: "mz_persist_retry_started_count",
325                help: "count of retry loops started",
326                var_labels: ["op"],
327            )),
328            retry_finished: registry.register(metric!(
329                name: "mz_persist_retry_finished_count",
330                help: "count of retry loops finished",
331                var_labels: ["op"],
332            )),
333            retry_retries: registry.register(metric!(
334                name: "mz_persist_retry_retries_count",
335                help: "count of total attempts by retry loops",
336                var_labels: ["op"],
337            )),
338            retry_sleep_seconds: registry.register(metric!(
339                name: "mz_persist_retry_sleep_seconds",
340                help: "time spent in retry loop backoff",
341                var_labels: ["op"],
342            )),
343
344            encode_count: registry.register(metric!(
345                name: "mz_persist_encode_count",
346                help: "count of op encodes",
347                var_labels: ["op"],
348            )),
349            encode_seconds: registry.register(metric!(
350                name: "mz_persist_encode_seconds",
351                help: "time spent in op encodes",
352                var_labels: ["op"],
353            )),
354            decode_count: registry.register(metric!(
355                name: "mz_persist_decode_count",
356                help: "count of op decodes",
357                var_labels: ["op"],
358            )),
359            decode_seconds: registry.register(metric!(
360                name: "mz_persist_decode_seconds",
361                help: "time spent in op decodes",
362                var_labels: ["op"],
363            )),
364
365            read_part_bytes: registry.register(metric!(
366                name: "mz_persist_read_batch_part_bytes",
367                help: "total encoded size of batch parts read",
368                var_labels: ["op"],
369            )),
370            read_part_goodbytes: registry.register(metric!(
371                name: "mz_persist_read_batch_part_goodbytes",
372                help: "total logical size of batch parts read",
373                var_labels: ["op"],
374            )),
375            read_part_count: registry.register(metric!(
376                name: "mz_persist_read_batch_part_count",
377                help: "count of batch parts read",
378                var_labels: ["op"],
379            )),
380            read_part_seconds: registry.register(metric!(
381                name: "mz_persist_read_batch_part_seconds",
382                help: "time spent reading batch parts",
383                var_labels: ["op"],
384            )),
385            read_ts_rewrite: registry.register(metric!(
386                name: "mz_persist_read_ts_rewite",
387                help: "count of updates read with rewritten ts",
388                var_labels: ["op"],
389            )),
390
391            lock_acquire_count: registry.register(metric!(
392                name: "mz_persist_lock_acquire_count",
393                help: "count of locks acquired",
394                var_labels: ["op"],
395            )),
396            lock_blocking_acquire_count: registry.register(metric!(
397                name: "mz_persist_lock_blocking_acquire_count",
398                help: "count of locks acquired that required blocking",
399                var_labels: ["op"],
400            )),
401            lock_blocking_seconds: registry.register(metric!(
402                name: "mz_persist_lock_blocking_seconds",
403                help: "time spent blocked for a lock",
404                var_labels: ["op"],
405            )),
406
407            alerts_metrics: Arc::new(AlertsMetrics::new(registry)),
408        }
409    }
410
411    fn cmds_metrics(&self, registry: &MetricsRegistry) -> CmdsMetrics {
412        CmdsMetrics {
413            init_state: self.cmd_metrics("init_state"),
414            add_rollup: self.cmd_metrics("add_rollup"),
415            remove_rollups: self.cmd_metrics("remove_rollups"),
416            register: self.cmd_metrics("register"),
417            compare_and_append: self.cmd_metrics("compare_and_append"),
418            compare_and_append_noop:             registry.register(metric!(
419                name: "mz_persist_cmd_compare_and_append_noop",
420                help: "count of compare_and_append retries that were discoverd to have already committed",
421            )),
422            compare_and_downgrade_since: self.cmd_metrics("compare_and_downgrade_since"),
423            downgrade_since: self.cmd_metrics("downgrade_since"),
424            expire_reader: self.cmd_metrics("expire_reader"),
425            expire_writer: self.cmd_metrics("expire_writer"),
426            merge_res: self.cmd_metrics("merge_res"),
427            become_tombstone: self.cmd_metrics("become_tombstone"),
428            compare_and_evolve_schema: self.cmd_metrics("compare_and_evolve_schema"),
429            spine_exert: self.cmd_metrics("spine_exert"),
430            fetch_upper_count: registry.register(metric!(
431                name: "mz_persist_cmd_fetch_upper_count",
432                help: "count of fetch_upper calls",
433            ))
434        }
435    }
436
437    fn cmd_metrics(&self, cmd: &str) -> CmdMetrics {
438        CmdMetrics {
439            name: cmd.to_owned(),
440            started: self.cmd_started.with_label_values(&[cmd]),
441            succeeded: self.cmd_succeeded.with_label_values(&[cmd]),
442            cas_mismatch: self.cmd_cas_mismatch.with_label_values(&[cmd]),
443            failed: self.cmd_failed.with_label_values(&[cmd]),
444            seconds: self.cmd_seconds.with_label_values(&[cmd]),
445        }
446    }
447
448    fn retries_metrics(&self) -> RetriesMetrics {
449        RetriesMetrics {
450            determinate: RetryDeterminate {
451                apply_unbatched_cmd_cas: self.retry_metrics("apply_unbatched_cmd::cas"),
452            },
453            external: RetryExternal {
454                batch_delete: Arc::new(self.retry_metrics("batch::delete")),
455                batch_set: self.retry_metrics("batch::set"),
456                blob_open: self.retry_metrics("blob::open"),
457                compaction_noop_delete: Arc::new(self.retry_metrics("compaction_noop::delete")),
458                consensus_open: self.retry_metrics("consensus::open"),
459                fetch_batch_get: self.retry_metrics("fetch_batch::get"),
460                fetch_state_scan: self.retry_metrics("fetch_state::scan"),
461                gc_truncate: self.retry_metrics("gc::truncate"),
462                maybe_init_cas: self.retry_metrics("maybe_init::cas"),
463                rollup_delete: self.retry_metrics("rollup::delete"),
464                rollup_get: self.retry_metrics("rollup::get"),
465                rollup_set: self.retry_metrics("rollup::set"),
466                hollow_run_get: self.retry_metrics("hollow_run::get"),
467                hollow_run_set: self.retry_metrics("hollow_run::set"),
468                storage_usage_shard_size: self.retry_metrics("storage_usage::shard_size"),
469            },
470            compare_and_append_idempotent: self.retry_metrics("compare_and_append_idempotent"),
471            fetch_latest_state: self.retry_metrics("fetch_latest_state"),
472            fetch_live_states: self.retry_metrics("fetch_live_states"),
473            idempotent_cmd: self.retry_metrics("idempotent_cmd"),
474            next_listen_batch: self.retry_metrics("next_listen_batch"),
475            snapshot: self.retry_metrics("snapshot"),
476        }
477    }
478
479    fn retry_metrics(&self, name: &str) -> RetryMetrics {
480        RetryMetrics {
481            name: name.to_owned(),
482            started: self.retry_started.with_label_values(&[name]),
483            finished: self.retry_finished.with_label_values(&[name]),
484            retries: self.retry_retries.with_label_values(&[name]),
485            sleep_seconds: self.retry_sleep_seconds.with_label_values(&[name]),
486        }
487    }
488
489    fn codecs_metrics(&self) -> CodecsMetrics {
490        CodecsMetrics {
491            state: self.codec_metrics("state"),
492            state_diff: self.codec_metrics("state_diff"),
493            batch: self.codec_metrics("batch"),
494            key: self.codec_metrics("key"),
495            val: self.codec_metrics("val"),
496        }
497    }
498
499    fn codec_metrics(&self, op: &str) -> CodecMetrics {
500        CodecMetrics {
501            encode_count: self.encode_count.with_label_values(&[op]),
502            encode_seconds: self.encode_seconds.with_label_values(&[op]),
503            decode_count: self.decode_count.with_label_values(&[op]),
504            decode_seconds: self.decode_seconds.with_label_values(&[op]),
505        }
506    }
507
508    fn blob_metrics(&self) -> BlobMetrics {
509        BlobMetrics {
510            set: self.external_op_metrics("blob_set", true),
511            get: self.external_op_metrics("blob_get", true),
512            list_keys: self.external_op_metrics("blob_list_keys", false),
513            delete: self.external_op_metrics("blob_delete", false),
514            restore: self.external_op_metrics("restore", false),
515            delete_noop: self.external_blob_delete_noop_count.clone(),
516            blob_sizes: self.external_blob_sizes.clone(),
517            rtt_latency: self.external_rtt_latency.with_label_values(&["blob"]),
518        }
519    }
520
521    fn consensus_metrics(&self) -> ConsensusMetrics {
522        ConsensusMetrics {
523            list_keys: self.external_op_metrics("consensus_list_keys", false),
524            head: self.external_op_metrics("consensus_head", false),
525            compare_and_set: self.external_op_metrics("consensus_cas", true),
526            scan: self.external_op_metrics("consensus_scan", false),
527            truncate: self.external_op_metrics("consensus_truncate", false),
528            truncated_count: self.external_consensus_truncated_count.clone(),
529            rtt_latency: self.external_rtt_latency.with_label_values(&["consensus"]),
530        }
531    }
532
533    fn external_op_metrics(&self, op: &str, latency_histogram: bool) -> ExternalOpMetrics {
534        ExternalOpMetrics {
535            started: self.external_op_started.with_label_values(&[op]),
536            succeeded: self.external_op_succeeded.with_label_values(&[op]),
537            failed: self.external_op_failed.with_label_values(&[op]),
538            bytes: self.external_op_bytes.with_label_values(&[op]),
539            seconds: self.external_op_seconds.with_label_values(&[op]),
540            seconds_histogram: if latency_histogram {
541                Some(self.external_op_latency.with_label_values(&[op]))
542            } else {
543                None
544            },
545            alerts_metrics: Arc::clone(&self.alerts_metrics),
546        }
547    }
548
549    fn batch_part_read_metrics(&self) -> BatchPartReadMetrics {
550        BatchPartReadMetrics {
551            listen: self.read_metrics("listen"),
552            snapshot: self.read_metrics("snapshot"),
553            batch_fetcher: self.read_metrics("batch_fetcher"),
554            compaction: self.read_metrics("compaction"),
555            unindexed: self.read_metrics("unindexed"),
556        }
557    }
558
559    fn read_metrics(&self, op: &str) -> ReadMetrics {
560        ReadMetrics {
561            part_bytes: self.read_part_bytes.with_label_values(&[op]),
562            part_goodbytes: self.read_part_goodbytes.with_label_values(&[op]),
563            part_count: self.read_part_count.with_label_values(&[op]),
564            seconds: self.read_part_seconds.with_label_values(&[op]),
565            ts_rewrite: self.read_ts_rewrite.with_label_values(&[op]),
566        }
567    }
568
569    fn locks_metrics(&self) -> LocksMetrics {
570        LocksMetrics {
571            applier_read_cacheable: self.lock_metrics("applier_read_cacheable"),
572            applier_read_noncacheable: self.lock_metrics("applier_read_noncacheable"),
573            applier_write: self.lock_metrics("applier_write"),
574            watch: self.lock_metrics("watch"),
575        }
576    }
577
578    fn lock_metrics(&self, op: &str) -> LockMetrics {
579        LockMetrics {
580            acquire_count: self.lock_acquire_count.with_label_values(&[op]),
581            blocking_acquire_count: self.lock_blocking_acquire_count.with_label_values(&[op]),
582            blocking_seconds: self.lock_blocking_seconds.with_label_values(&[op]),
583        }
584    }
585}
586
587#[derive(Debug)]
588pub struct CmdMetrics {
589    pub(crate) name: String,
590    pub(crate) started: IntCounter,
591    pub(crate) cas_mismatch: IntCounter,
592    pub(crate) succeeded: IntCounter,
593    pub(crate) failed: IntCounter,
594    pub(crate) seconds: Counter,
595}
596
597impl CmdMetrics {
598    pub async fn run_cmd<R, E, F, CmdFn>(
599        &self,
600        shard_metrics: &ShardMetrics,
601        cmd_fn: CmdFn,
602    ) -> Result<R, E>
603    where
604        F: std::future::Future<Output = Result<R, E>>,
605        CmdFn: FnOnce() -> F,
606    {
607        self.started.inc();
608        let start = Instant::now();
609        let res = cmd_fn().await;
610        self.seconds.inc_by(start.elapsed().as_secs_f64());
611        match res.as_ref() {
612            Ok(_) => {
613                self.succeeded.inc();
614                shard_metrics.cmd_succeeded.inc();
615            }
616            Err(_) => self.failed.inc(),
617        };
618        res
619    }
620}
621
622#[derive(Debug)]
623pub struct CmdsMetrics {
624    pub(crate) init_state: CmdMetrics,
625    pub(crate) add_rollup: CmdMetrics,
626    pub(crate) remove_rollups: CmdMetrics,
627    pub(crate) register: CmdMetrics,
628    pub(crate) compare_and_append: CmdMetrics,
629    pub(crate) compare_and_append_noop: IntCounter,
630    pub(crate) compare_and_downgrade_since: CmdMetrics,
631    pub(crate) downgrade_since: CmdMetrics,
632    pub(crate) expire_reader: CmdMetrics,
633    pub(crate) expire_writer: CmdMetrics,
634    pub(crate) merge_res: CmdMetrics,
635    pub(crate) become_tombstone: CmdMetrics,
636    pub(crate) compare_and_evolve_schema: CmdMetrics,
637    pub(crate) spine_exert: CmdMetrics,
638    pub(crate) fetch_upper_count: IntCounter,
639}
640
641#[derive(Debug)]
642pub struct RetryMetrics {
643    pub(crate) name: String,
644    pub(crate) started: IntCounter,
645    pub(crate) finished: IntCounter,
646    pub(crate) retries: IntCounter,
647    pub(crate) sleep_seconds: Counter,
648}
649
650impl RetryMetrics {
651    pub(crate) fn stream(&self, retry: RetryStream) -> MetricsRetryStream {
652        MetricsRetryStream::new(retry, self)
653    }
654}
655
656#[derive(Debug)]
657pub struct RetryDeterminate {
658    pub(crate) apply_unbatched_cmd_cas: RetryMetrics,
659}
660
661#[derive(Debug)]
662pub struct RetryExternal {
663    pub(crate) batch_delete: Arc<RetryMetrics>,
664    pub(crate) batch_set: RetryMetrics,
665    pub(crate) blob_open: RetryMetrics,
666    pub(crate) compaction_noop_delete: Arc<RetryMetrics>,
667    pub(crate) consensus_open: RetryMetrics,
668    pub(crate) fetch_batch_get: RetryMetrics,
669    pub(crate) fetch_state_scan: RetryMetrics,
670    pub(crate) gc_truncate: RetryMetrics,
671    pub(crate) maybe_init_cas: RetryMetrics,
672    pub(crate) rollup_delete: RetryMetrics,
673    pub(crate) rollup_get: RetryMetrics,
674    pub(crate) rollup_set: RetryMetrics,
675    pub(crate) hollow_run_get: RetryMetrics,
676    pub(crate) hollow_run_set: RetryMetrics,
677    pub(crate) storage_usage_shard_size: RetryMetrics,
678}
679
680#[derive(Debug)]
681pub struct RetriesMetrics {
682    pub(crate) determinate: RetryDeterminate,
683    pub(crate) external: RetryExternal,
684
685    pub(crate) compare_and_append_idempotent: RetryMetrics,
686    pub(crate) fetch_latest_state: RetryMetrics,
687    pub(crate) fetch_live_states: RetryMetrics,
688    pub(crate) idempotent_cmd: RetryMetrics,
689    pub(crate) next_listen_batch: RetryMetrics,
690    pub(crate) snapshot: RetryMetrics,
691}
692
693#[derive(Debug)]
694pub struct BatchPartReadMetrics {
695    pub(crate) listen: ReadMetrics,
696    pub(crate) snapshot: ReadMetrics,
697    pub(crate) batch_fetcher: ReadMetrics,
698    pub(crate) compaction: ReadMetrics,
699    pub(crate) unindexed: ReadMetrics,
700}
701
702#[derive(Debug, Clone)]
703pub struct ReadMetrics {
704    pub(crate) part_bytes: IntCounter,
705    pub(crate) part_goodbytes: IntCounter,
706    pub(crate) part_count: IntCounter,
707    pub(crate) seconds: Counter,
708    pub(crate) ts_rewrite: IntCounter,
709}
710
711// This one is Clone in contrast to the others because it has to get moved into
712// a task.
713#[derive(Debug, Clone)]
714pub struct BatchWriteMetrics {
715    pub(crate) bytes: IntCounter,
716    pub(crate) goodbytes: IntCounter,
717    pub(crate) seconds: Counter,
718    pub(crate) write_stalls: IntCounter,
719    pub(crate) key_lower_too_big: IntCounter,
720
721    pub(crate) unordered: IntCounter,
722    pub(crate) codec_order: IntCounter,
723    pub(crate) structured_order: IntCounter,
724    _order_counts: IntCounterVec,
725
726    pub(crate) step_stats: Counter,
727    pub(crate) step_part_writing: Counter,
728    pub(crate) step_inline: Counter,
729}
730
731impl BatchWriteMetrics {
732    fn new(registry: &MetricsRegistry, name: &str) -> Self {
733        let order_counts: IntCounterVec = registry.register(metric!(
734                name: format!("mz_persist_{}_write_batch_order", name),
735                help: "count of batches by the data ordering",
736                var_labels: ["order"],
737        ));
738        let unordered = order_counts.with_label_values(&["unordered"]);
739        let codec_order = order_counts.with_label_values(&["codec"]);
740        let structured_order = order_counts.with_label_values(&["structured"]);
741
742        BatchWriteMetrics {
743            bytes: registry.register(metric!(
744                name: format!("mz_persist_{}_bytes", name),
745                help: format!("total encoded size of {} batches written", name),
746            )),
747            goodbytes: registry.register(metric!(
748                name: format!("mz_persist_{}_goodbytes", name),
749                help: format!("total logical size of {} batches written", name),
750            )),
751            seconds: registry.register(metric!(
752                name: format!("mz_persist_{}_write_batch_part_seconds", name),
753                help: format!("time spent writing {} batches", name),
754            )),
755            write_stalls: registry.register(metric!(
756                name: format!("mz_persist_{}_write_stall_count", name),
757                help: format!(
758                    "count of {} writes stalling to await max outstanding reqs",
759                    name
760                ),
761            )),
762            key_lower_too_big: registry.register(metric!(
763                name: format!("mz_persist_{}_key_lower_too_big", name),
764                help: format!(
765                    "count of {} writes that were unable to write a key lower, because the size threshold was too low",
766                    name
767                ),
768            )),
769            unordered,
770            codec_order,
771            structured_order,
772            _order_counts: order_counts,
773            step_stats: registry.register(metric!(
774                name: format!("mz_persist_{}_step_stats", name),
775                help: format!("time spent computing {} update stats", name),
776            )),
777            step_part_writing: registry.register(metric!(
778                name: format!("mz_persist_{}_step_part_writing", name),
779                help: format!("blocking time spent writing parts for {} updates", name),
780            )),
781            step_inline: registry.register(metric!(
782                name: format!("mz_persist_{}_step_inline", name),
783                help: format!("time spent encoding {} inline batches", name)
784            )),
785        }
786    }
787}
788
789#[derive(Debug)]
790pub struct CompactionMetrics {
791    pub(crate) requested: IntCounter,
792    pub(crate) dropped: IntCounter,
793    pub(crate) disabled: IntCounter,
794    pub(crate) skipped: IntCounter,
795    pub(crate) started: IntCounter,
796    pub(crate) applied: IntCounter,
797    pub(crate) timed_out: IntCounter,
798    pub(crate) failed: IntCounter,
799    pub(crate) noop: IntCounter,
800    pub(crate) seconds: Counter,
801    pub(crate) concurrency_waits: IntCounter,
802    pub(crate) queued_seconds: Counter,
803    pub(crate) memory_violations: IntCounter,
804    pub(crate) runs_compacted: IntCounter,
805    pub(crate) chunks_compacted: IntCounter,
806    pub(crate) not_all_prefetched: IntCounter,
807    pub(crate) parts_prefetched: IntCounter,
808    pub(crate) parts_waited: IntCounter,
809    pub(crate) fast_path_eligible: IntCounter,
810    pub(crate) admin_count: IntCounter,
811
812    pub(crate) applied_exact_match: IntCounter,
813    pub(crate) applied_subset_match: IntCounter,
814    pub(crate) not_applied_too_many_updates: IntCounter,
815
816    pub(crate) batch: BatchWriteMetrics,
817    pub(crate) steps: CompactionStepTimings,
818    pub(crate) schema_selection: CompactionSchemaSelection,
819
820    pub(crate) _steps_vec: CounterVec,
821}
822
823impl CompactionMetrics {
824    fn new(registry: &MetricsRegistry) -> Self {
825        let step_timings: CounterVec = registry.register(metric!(
826                name: "mz_persist_compaction_step_seconds",
827                help: "time spent on individual steps of compaction",
828                var_labels: ["step"],
829        ));
830        let schema_selection: CounterVec = registry.register(metric!(
831            name: "mz_persist_compaction_schema_selection",
832            help: "count of compactions and how we did schema selection",
833            var_labels: ["selection"],
834        ));
835
836        CompactionMetrics {
837            requested: registry.register(metric!(
838                name: "mz_persist_compaction_requested",
839                help: "count of total compaction requests",
840            )),
841            dropped: registry.register(metric!(
842                name: "mz_persist_compaction_dropped",
843                help: "count of total compaction requests dropped due to a full queue",
844            )),
845            disabled: registry.register(metric!(
846                name: "mz_persist_compaction_disabled",
847                help: "count of total compaction requests dropped because compaction was disabled",
848            )),
849            skipped: registry.register(metric!(
850                name: "mz_persist_compaction_skipped",
851                help: "count of compactions skipped due to heuristics",
852            )),
853            started: registry.register(metric!(
854                name: "mz_persist_compaction_started",
855                help: "count of compactions started",
856            )),
857            failed: registry.register(metric!(
858                name: "mz_persist_compaction_failed",
859                help: "count of compactions failed",
860            )),
861            applied: registry.register(metric!(
862                name: "mz_persist_compaction_applied",
863                help: "count of compactions applied to state",
864            )),
865            timed_out: registry.register(metric!(
866                name: "mz_persist_compaction_timed_out",
867                help: "count of compactions that timed out",
868            )),
869            noop: registry.register(metric!(
870                name: "mz_persist_compaction_noop",
871                help: "count of compactions discarded (obsolete)",
872            )),
873            seconds: registry.register(metric!(
874                name: "mz_persist_compaction_seconds",
875                help: "time spent in compaction",
876            )),
877            concurrency_waits: registry.register(metric!(
878                name: "mz_persist_compaction_concurrency_waits",
879                help: "count of compaction requests that ever blocked due to concurrency limit",
880            )),
881            queued_seconds: registry.register(metric!(
882                name: "mz_persist_compaction_queued_seconds",
883                help: "time that compaction requests spent queued",
884            )),
885            memory_violations: registry.register(metric!(
886                name: "mz_persist_compaction_memory_violations",
887                help: "count of compaction memory requirement violations",
888            )),
889            runs_compacted: registry.register(metric!(
890                name: "mz_persist_compaction_runs_compacted",
891                help: "count of runs compacted",
892            )),
893            chunks_compacted: registry.register(metric!(
894                name: "mz_persist_compaction_chunks_compacted",
895                help: "count of run chunks compacted",
896            )),
897            not_all_prefetched: registry.register(metric!(
898                name: "mz_persist_compaction_not_all_prefetched",
899                help: "count of compactions where not all inputs were prefetched",
900            )),
901            parts_prefetched: registry.register(metric!(
902                name: "mz_persist_compaction_parts_prefetched",
903                help: "count of compaction parts completely prefetched by the time they're needed",
904            )),
905            parts_waited: registry.register(metric!(
906                name: "mz_persist_compaction_parts_waited",
907                help: "count of compaction parts that had to be waited on",
908            )),
909            fast_path_eligible: registry.register(metric!(
910                name: "mz_persist_compaction_fast_path_eligible",
911                help: "count of compaction requests that could have used the fast-path optimization",
912            )),
913            admin_count: registry.register(metric!(
914                name: "mz_persist_compaction_admin_count",
915                help: "count of compaction requests that were performed by admin tooling",
916            )),
917            applied_exact_match: registry.register(metric!(
918                name: "mz_persist_compaction_applied_exact_match",
919                help: "count of merge results that exactly replaced a SpineBatch",
920            )),
921            applied_subset_match: registry.register(metric!(
922                name: "mz_persist_compaction_applied_subset_match",
923                help: "count of merge results that replaced a subset of a SpineBatch",
924            )),
925            not_applied_too_many_updates: registry.register(metric!(
926                name: "mz_persist_compaction_not_applied_too_many_updates",
927                help: "count of merge results that did not apply due to too many updates",
928            )),
929            batch: BatchWriteMetrics::new(registry, "compaction"),
930            steps: CompactionStepTimings::new(step_timings.clone()),
931            schema_selection: CompactionSchemaSelection::new(schema_selection.clone()),
932            _steps_vec: step_timings,
933        }
934    }
935}
936
937#[derive(Debug)]
938pub struct CompactionStepTimings {
939    pub(crate) part_fetch_seconds: Counter,
940    pub(crate) heap_population_seconds: Counter,
941}
942
943impl CompactionStepTimings {
944    fn new(step_timings: CounterVec) -> CompactionStepTimings {
945        CompactionStepTimings {
946            part_fetch_seconds: step_timings.with_label_values(&["part_fetch"]),
947            heap_population_seconds: step_timings.with_label_values(&["heap_population"]),
948        }
949    }
950}
951
952#[derive(Debug)]
953pub struct CompactionSchemaSelection {
954    pub(crate) recent_schema: Counter,
955    pub(crate) no_schema: Counter,
956}
957
958impl CompactionSchemaSelection {
959    fn new(schema_selection: CounterVec) -> CompactionSchemaSelection {
960        CompactionSchemaSelection {
961            recent_schema: schema_selection.with_label_values(&["recent"]),
962            no_schema: schema_selection.with_label_values(&["none"]),
963        }
964    }
965}
966
967#[derive(Debug)]
968pub struct GcMetrics {
969    pub(crate) noop: IntCounter,
970    pub(crate) started: IntCounter,
971    pub(crate) finished: IntCounter,
972    pub(crate) merged: IntCounter,
973    pub(crate) seconds: Counter,
974    pub(crate) steps: GcStepTimings,
975}
976
977#[derive(Debug)]
978pub struct GcStepTimings {
979    pub(crate) find_removable_rollups: Counter,
980    pub(crate) fetch_seconds: Counter,
981    pub(crate) find_deletable_blobs_seconds: Counter,
982    pub(crate) delete_rollup_seconds: Counter,
983    pub(crate) delete_batch_part_seconds: Counter,
984    pub(crate) truncate_diff_seconds: Counter,
985    pub(crate) remove_rollups_from_state: Counter,
986    pub(crate) post_gc_calculations_seconds: Counter,
987}
988
989impl GcStepTimings {
990    fn new(step_timings: CounterVec) -> Self {
991        Self {
992            find_removable_rollups: step_timings.with_label_values(&["find_removable_rollups"]),
993            fetch_seconds: step_timings.with_label_values(&["fetch"]),
994            find_deletable_blobs_seconds: step_timings.with_label_values(&["find_deletable_blobs"]),
995            delete_rollup_seconds: step_timings.with_label_values(&["delete_rollup"]),
996            delete_batch_part_seconds: step_timings.with_label_values(&["delete_batch_part"]),
997            truncate_diff_seconds: step_timings.with_label_values(&["truncate_diff"]),
998            remove_rollups_from_state: step_timings
999                .with_label_values(&["remove_rollups_from_state"]),
1000            post_gc_calculations_seconds: step_timings.with_label_values(&["post_gc_calculations"]),
1001        }
1002    }
1003}
1004
1005impl GcMetrics {
1006    fn new(registry: &MetricsRegistry) -> Self {
1007        let step_timings: CounterVec = registry.register(metric!(
1008                name: "mz_persist_gc_step_seconds",
1009                help: "time spent on individual steps of gc",
1010                var_labels: ["step"],
1011        ));
1012        GcMetrics {
1013            noop: registry.register(metric!(
1014                name: "mz_persist_gc_noop",
1015                help: "count of garbage collections skipped because they were already done",
1016            )),
1017            started: registry.register(metric!(
1018                name: "mz_persist_gc_started",
1019                help: "count of garbage collections started",
1020            )),
1021            finished: registry.register(metric!(
1022                name: "mz_persist_gc_finished",
1023                help: "count of garbage collections finished",
1024            )),
1025            merged: registry.register(metric!(
1026                name: "mz_persist_gc_merged_reqs",
1027                help: "count of garbage collection requests merged",
1028            )),
1029            seconds: registry.register(metric!(
1030                name: "mz_persist_gc_seconds",
1031                help: "time spent in garbage collections",
1032            )),
1033            steps: GcStepTimings::new(step_timings),
1034        }
1035    }
1036}
1037
1038#[derive(Debug)]
1039pub struct LeaseMetrics {
1040    pub(crate) timeout_read: IntCounter,
1041    pub(crate) dropped_part: IntCounter,
1042}
1043
1044impl LeaseMetrics {
1045    fn new(registry: &MetricsRegistry) -> Self {
1046        LeaseMetrics {
1047            timeout_read: registry.register(metric!(
1048                name: "mz_persist_lease_timeout_read",
1049                help: "count of readers whose lease timed out",
1050            )),
1051            dropped_part: registry.register(metric!(
1052                name: "mz_persist_lease_dropped_part",
1053                help: "count of LeasedBatchParts that were dropped without being politely returned",
1054            )),
1055        }
1056    }
1057}
1058
1059struct IncOnDrop(IntCounter);
1060
1061impl Drop for IncOnDrop {
1062    fn drop(&mut self) {
1063        self.0.inc()
1064    }
1065}
1066
1067pub struct MetricsRetryStream {
1068    retry: RetryStream,
1069    pub(crate) retries: IntCounter,
1070    sleep_seconds: Counter,
1071    _finished: IncOnDrop,
1072}
1073
1074impl MetricsRetryStream {
1075    pub fn new(retry: RetryStream, metrics: &RetryMetrics) -> Self {
1076        metrics.started.inc();
1077        MetricsRetryStream {
1078            retry,
1079            retries: metrics.retries.clone(),
1080            sleep_seconds: metrics.sleep_seconds.clone(),
1081            _finished: IncOnDrop(metrics.finished.clone()),
1082        }
1083    }
1084
1085    /// How many times [Self::sleep] has been called.
1086    pub fn attempt(&self) -> usize {
1087        self.retry.attempt()
1088    }
1089
1090    /// The next sleep (without jitter for easy printing in logs).
1091    pub fn next_sleep(&self) -> Duration {
1092        self.retry.next_sleep()
1093    }
1094
1095    /// Executes the next sleep in the series.
1096    ///
1097    /// This isn't cancel-safe, so it consumes and returns self, to prevent
1098    /// accidental mis-use.
1099    pub async fn sleep(self) -> Self {
1100        self.retries.inc();
1101        self.sleep_seconds
1102            .inc_by(self.retry.next_sleep().as_secs_f64());
1103        let retry = self.retry.sleep().await;
1104        MetricsRetryStream {
1105            retry,
1106            retries: self.retries,
1107            sleep_seconds: self.sleep_seconds,
1108            _finished: self._finished,
1109        }
1110    }
1111}
1112
1113#[derive(Debug)]
1114pub struct CodecsMetrics {
1115    pub(crate) state: CodecMetrics,
1116    pub(crate) state_diff: CodecMetrics,
1117    pub(crate) batch: CodecMetrics,
1118    pub(crate) key: CodecMetrics,
1119    pub(crate) val: CodecMetrics,
1120    // Intentionally not adding time and diff because they're just
1121    // `{to,from}_le_bytes`.
1122}
1123
1124#[derive(Debug)]
1125pub struct CodecMetrics {
1126    pub(crate) encode_count: IntCounter,
1127    pub(crate) encode_seconds: Counter,
1128    pub(crate) decode_count: IntCounter,
1129    pub(crate) decode_seconds: Counter,
1130}
1131
1132impl CodecMetrics {
1133    pub(crate) fn encode<R, F: FnOnce() -> R>(&self, f: F) -> R {
1134        let now = Instant::now();
1135        let r = f();
1136        self.encode_count.inc();
1137        self.encode_seconds.inc_by(now.elapsed().as_secs_f64());
1138        r
1139    }
1140
1141    pub(crate) fn decode<R, F: FnOnce() -> R>(&self, f: F) -> R {
1142        let now = Instant::now();
1143        let r = f();
1144        self.decode_count.inc();
1145        self.decode_seconds.inc_by(now.elapsed().as_secs_f64());
1146        r
1147    }
1148}
1149
1150#[derive(Debug)]
1151pub struct StateMetrics {
1152    pub(crate) apply_spine_fast_path: IntCounter,
1153    pub(crate) apply_spine_slow_path: IntCounter,
1154    pub(crate) apply_spine_slow_path_lenient: IntCounter,
1155    pub(crate) apply_spine_slow_path_lenient_adjustment: IntCounter,
1156    pub(crate) apply_spine_slow_path_with_reconstruction: IntCounter,
1157    pub(crate) apply_spine_flattened: IntCounter,
1158    pub(crate) update_state_noop_path: IntCounter,
1159    pub(crate) update_state_empty_path: IntCounter,
1160    pub(crate) update_state_fast_path: IntCounter,
1161    pub(crate) update_state_slow_path: IntCounter,
1162    pub(crate) rollup_at_seqno_migration: IntCounter,
1163    pub(crate) fetch_recent_live_diffs_fast_path: IntCounter,
1164    pub(crate) fetch_recent_live_diffs_slow_path: IntCounter,
1165    pub(crate) writer_added: IntCounter,
1166    pub(crate) writer_removed: IntCounter,
1167    pub(crate) force_apply_hostname: IntCounter,
1168    pub(crate) rollup_write_success: IntCounter,
1169    pub(crate) rollup_write_noop_latest: IntCounter,
1170    pub(crate) rollup_write_noop_truncated: IntCounter,
1171}
1172
1173impl StateMetrics {
1174    pub(crate) fn new(registry: &MetricsRegistry) -> Self {
1175        let rollup_write_noop: IntCounterVec = registry.register(metric!(
1176                name: "mz_persist_state_rollup_write_noop",
1177                help: "count of no-op rollup writes",
1178                var_labels: ["reason"],
1179        ));
1180
1181        StateMetrics {
1182            apply_spine_fast_path: registry.register(metric!(
1183                name: "mz_persist_state_apply_spine_fast_path",
1184                help: "count of spine diff applications that hit the fast path",
1185            )),
1186            apply_spine_slow_path: registry.register(metric!(
1187                name: "mz_persist_state_apply_spine_slow_path",
1188                help: "count of spine diff applications that hit the slow path",
1189            )),
1190            apply_spine_slow_path_lenient: registry.register(metric!(
1191                name: "mz_persist_state_apply_spine_slow_path_lenient",
1192                help: "count of spine diff applications that hit the lenient compaction apply path",
1193            )),
1194            apply_spine_slow_path_lenient_adjustment: registry.register(metric!(
1195                name: "mz_persist_state_apply_spine_slow_path_lenient_adjustment",
1196                help: "count of adjustments made by the lenient compaction apply path",
1197            )),
1198            apply_spine_slow_path_with_reconstruction: registry.register(metric!(
1199                name: "mz_persist_state_apply_spine_slow_path_with_reconstruction",
1200                help: "count of spine diff applications that hit the slow path with extra spine reconstruction step",
1201            )),
1202            apply_spine_flattened: registry.register(metric!(
1203                name: "mz_persist_state_apply_spine_flattened",
1204                help: "count of spine diff applications that flatten the trace",
1205            )),
1206            update_state_noop_path: registry.register(metric!(
1207                name: "mz_persist_state_update_state_noop_path",
1208                help: "count of state update applications that no-oped due to shared state",
1209            )),
1210            update_state_empty_path: registry.register(metric!(
1211                name: "mz_persist_state_update_state_empty_path",
1212                help: "count of state update applications that found no new updates",
1213            )),
1214            update_state_fast_path: registry.register(metric!(
1215                name: "mz_persist_state_update_state_fast_path",
1216                help: "count of state update applications that hit the fast path",
1217            )),
1218            update_state_slow_path: registry.register(metric!(
1219                name: "mz_persist_state_update_state_slow_path",
1220                help: "count of state update applications that hit the slow path",
1221            )),
1222            rollup_at_seqno_migration: registry.register(metric!(
1223                name: "mz_persist_state_rollup_at_seqno_migration",
1224                help: "count of fetch_rollup_at_seqno calls that only worked because of the migration",
1225            )),
1226            fetch_recent_live_diffs_fast_path: registry.register(metric!(
1227                name: "mz_persist_state_fetch_recent_live_diffs_fast_path",
1228                help: "count of fetch_recent_live_diffs that hit the fast path",
1229            )),
1230            fetch_recent_live_diffs_slow_path: registry.register(metric!(
1231                name: "mz_persist_state_fetch_recent_live_diffs_slow_path",
1232                help: "count of fetch_recent_live_diffs that hit the slow path",
1233            )),
1234            writer_added: registry.register(metric!(
1235                name: "mz_persist_state_writer_added",
1236                help: "count of writers added to the state",
1237            )),
1238            writer_removed: registry.register(metric!(
1239                name: "mz_persist_state_writer_removed",
1240                help: "count of writers removed from the state",
1241            )),
1242            force_apply_hostname: registry.register(metric!(
1243                name: "mz_persist_state_force_applied_hostname",
1244                help: "count of when hostname diffs needed to be force applied",
1245            )),
1246            rollup_write_success: registry.register(metric!(
1247                name: "mz_persist_state_rollup_write_success",
1248                help: "count of rollups written successful (may not be linked in to state)",
1249            )),
1250            rollup_write_noop_latest: rollup_write_noop.with_label_values(&["latest"]),
1251            rollup_write_noop_truncated: rollup_write_noop.with_label_values(&["truncated"]),
1252        }
1253    }
1254}
1255
1256#[derive(Debug)]
1257pub struct ShardsMetrics {
1258    // Unlike all the other metrics in here, ShardsMetrics intentionally uses
1259    // the DeleteOnDrop wrappers. A process might stop using a shard (drop all
1260    // handles to it) but e.g. the set of commands never changes.
1261    _count: ComputedIntGauge,
1262    since: mz_ore::metrics::IntGaugeVec,
1263    upper: mz_ore::metrics::IntGaugeVec,
1264    encoded_rollup_size: mz_ore::metrics::UIntGaugeVec,
1265    encoded_diff_size: mz_ore::metrics::IntCounterVec,
1266    hollow_batch_count: mz_ore::metrics::UIntGaugeVec,
1267    spine_batch_count: mz_ore::metrics::UIntGaugeVec,
1268    batch_part_count: mz_ore::metrics::UIntGaugeVec,
1269    batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
1270    batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
1271    update_count: mz_ore::metrics::UIntGaugeVec,
1272    rollup_count: mz_ore::metrics::UIntGaugeVec,
1273    largest_batch_size: mz_ore::metrics::UIntGaugeVec,
1274    seqnos_held: mz_ore::metrics::UIntGaugeVec,
1275    seqnos_since_last_rollup: mz_ore::metrics::UIntGaugeVec,
1276    gc_seqno_held_parts: mz_ore::metrics::UIntGaugeVec,
1277    gc_live_diffs: mz_ore::metrics::UIntGaugeVec,
1278    gc_finished: mz_ore::metrics::IntCounterVec,
1279    compaction_applied: mz_ore::metrics::IntCounterVec,
1280    cmd_succeeded: mz_ore::metrics::IntCounterVec,
1281    usage_current_state_batches_bytes: mz_ore::metrics::UIntGaugeVec,
1282    usage_current_state_rollups_bytes: mz_ore::metrics::UIntGaugeVec,
1283    usage_referenced_not_current_state_bytes: mz_ore::metrics::UIntGaugeVec,
1284    usage_not_leaked_not_referenced_bytes: mz_ore::metrics::UIntGaugeVec,
1285    usage_leaked_bytes: mz_ore::metrics::UIntGaugeVec,
1286    pubsub_push_diff_applied: mz_ore::metrics::IntCounterVec,
1287    pubsub_push_diff_not_applied_stale: mz_ore::metrics::IntCounterVec,
1288    pubsub_push_diff_not_applied_out_of_order: mz_ore::metrics::IntCounterVec,
1289    stale_version: mz_ore::metrics::UIntGaugeVec,
1290    blob_gets: mz_ore::metrics::IntCounterVec,
1291    blob_sets: mz_ore::metrics::IntCounterVec,
1292    live_writers: mz_ore::metrics::UIntGaugeVec,
1293    unconsolidated_snapshot: mz_ore::metrics::IntCounterVec,
1294    backpressure_emitted_bytes: IntCounterVec,
1295    backpressure_last_backpressured_bytes: UIntGaugeVec,
1296    backpressure_retired_bytes: IntCounterVec,
1297    rewrite_part_count: UIntGaugeVec,
1298    inline_part_count: UIntGaugeVec,
1299    inline_part_bytes: UIntGaugeVec,
1300    compact_batches: UIntGaugeVec,
1301    compacting_batches: UIntGaugeVec,
1302    noncompact_batches: UIntGaugeVec,
1303    schema_registry_version_count: UIntGaugeVec,
1304    inline_backpressure_count: IntCounterVec,
1305    // We hand out `Arc<ShardMetrics>` to read and write handles, but store it
1306    // here as `Weak`. This allows us to discover if it's no longer in use and
1307    // so we can remove it from the map.
1308    shards: Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
1309}
1310
1311impl ShardsMetrics {
1312    fn new(registry: &MetricsRegistry) -> Self {
1313        let shards = Arc::new(Mutex::new(BTreeMap::new()));
1314        let shards_count = Arc::clone(&shards);
1315        ShardsMetrics {
1316            _count: registry.register_computed_gauge(
1317                metric!(
1318                    name: "mz_persist_shard_count",
1319                    help: "count of all active shards on this process",
1320                ),
1321                move || {
1322                    let mut ret = 0;
1323                    Self::compute(&shards_count, |_m| ret += 1);
1324                    ret
1325                },
1326            ),
1327            since: registry.register(metric!(
1328                name: "mz_persist_shard_since",
1329                help: "since by shard",
1330                var_labels: ["shard", "name"],
1331            )),
1332            upper: registry.register(metric!(
1333                name: "mz_persist_shard_upper",
1334                help: "upper by shard",
1335                var_labels: ["shard", "name"],
1336            )),
1337            encoded_rollup_size: registry.register(metric!(
1338                name: "mz_persist_shard_rollup_size_bytes",
1339                help: "total encoded rollup size by shard",
1340                var_labels: ["shard", "name"],
1341            )),
1342            encoded_diff_size: registry.register(metric!(
1343                name: "mz_persist_shard_diff_size_bytes",
1344                help: "total encoded diff size by shard",
1345                var_labels: ["shard", "name"],
1346            )),
1347            hollow_batch_count: registry.register(metric!(
1348                name: "mz_persist_shard_hollow_batch_count",
1349                help: "count of hollow batches by shard",
1350                var_labels: ["shard", "name"],
1351            )),
1352            spine_batch_count: registry.register(metric!(
1353                name: "mz_persist_shard_spine_batch_count",
1354                help: "count of spine batches by shard",
1355                var_labels: ["shard", "name"],
1356            )),
1357            batch_part_count: registry.register(metric!(
1358                name: "mz_persist_shard_batch_part_count",
1359                help: "count of batch parts by shard",
1360                var_labels: ["shard", "name"],
1361            )),
1362            batch_part_version_count: registry.register(metric!(
1363                name: "mz_persist_shard_batch_part_version_count",
1364                help: "count of batch parts by shard and version",
1365                var_labels: ["shard", "name", "version"],
1366            )),
1367            batch_part_version_bytes: registry.register(metric!(
1368                name: "mz_persist_shard_batch_part_version_bytes",
1369                help: "total bytes in batch parts by shard and version",
1370                var_labels: ["shard", "name", "version"],
1371            )),
1372            update_count: registry.register(metric!(
1373                name: "mz_persist_shard_update_count",
1374                help: "count of updates by shard",
1375                var_labels: ["shard", "name"],
1376            )),
1377            rollup_count: registry.register(metric!(
1378                name: "mz_persist_shard_rollup_count",
1379                help: "count of rollups by shard",
1380                var_labels: ["shard", "name"],
1381            )),
1382            largest_batch_size: registry.register(metric!(
1383                name: "mz_persist_shard_largest_batch_size",
1384                help: "largest encoded batch size by shard",
1385                var_labels: ["shard", "name"],
1386            )),
1387            seqnos_held: registry.register(metric!(
1388                name: "mz_persist_shard_seqnos_held",
1389                help: "maximum count of gc-ineligible states by shard",
1390                var_labels: ["shard", "name"],
1391            )),
1392            seqnos_since_last_rollup: registry.register(metric!(
1393                name: "mz_persist_shard_seqnos_since_last_rollup",
1394                help: "count of seqnos since last rollup",
1395                var_labels: ["shard", "name"],
1396            )),
1397            gc_seqno_held_parts: registry.register(metric!(
1398                name: "mz_persist_shard_gc_seqno_held_parts",
1399                help: "count of parts referenced by some live state but not the current state (ie. parts kept only to satisfy seqno holds) at GC time",
1400                var_labels: ["shard", "name"],
1401            )),
1402            gc_live_diffs: registry.register(metric!(
1403                name: "mz_persist_shard_gc_live_diffs",
1404                help: "the number of diffs (or, alternatively, the number of seqnos) present in consensus state at GC time",
1405                var_labels: ["shard", "name"],
1406            )),
1407            gc_finished: registry.register(metric!(
1408                name: "mz_persist_shard_gc_finished",
1409                help: "count of garbage collections finished by shard",
1410                var_labels: ["shard", "name"],
1411            )),
1412            compaction_applied: registry.register(metric!(
1413                name: "mz_persist_shard_compaction_applied",
1414                help: "count of compactions applied to state by shard",
1415                var_labels: ["shard", "name"],
1416            )),
1417            cmd_succeeded: registry.register(metric!(
1418                name: "mz_persist_shard_cmd_succeeded",
1419                help: "count of commands succeeded by shard",
1420                var_labels: ["shard", "name"],
1421            )),
1422            usage_current_state_batches_bytes: registry.register(metric!(
1423                name: "mz_persist_shard_usage_current_state_batches_bytes",
1424                help: "data in batches/parts referenced by current version of state",
1425                var_labels: ["shard", "name"],
1426            )),
1427            usage_current_state_rollups_bytes: registry.register(metric!(
1428                name: "mz_persist_shard_usage_current_state_rollups_bytes",
1429                help: "data in rollups referenced by current version of state",
1430                var_labels: ["shard", "name"],
1431            )),
1432            usage_referenced_not_current_state_bytes: registry.register(metric!(
1433                name: "mz_persist_shard_usage_referenced_not_current_state_bytes",
1434                help: "data referenced only by a previous version of state",
1435                var_labels: ["shard", "name"],
1436            )),
1437            usage_not_leaked_not_referenced_bytes: registry.register(metric!(
1438                name: "mz_persist_shard_usage_not_leaked_not_referenced_bytes",
1439                help: "data written by an active writer but not referenced by any version of state",
1440                var_labels: ["shard", "name"],
1441            )),
1442            usage_leaked_bytes: registry.register(metric!(
1443                name: "mz_persist_shard_usage_leaked_bytes",
1444                help: "data reclaimable by a leaked blob detector",
1445                var_labels: ["shard", "name"],
1446            )),
1447            pubsub_push_diff_applied: registry.register(metric!(
1448                name: "mz_persist_shard_pubsub_diff_applied",
1449                help: "number of diffs received via pubsub that applied",
1450                var_labels: ["shard", "name"],
1451            )),
1452            pubsub_push_diff_not_applied_stale: registry.register(metric!(
1453                name: "mz_persist_shard_pubsub_diff_not_applied_stale",
1454                help: "number of diffs received via pubsub that did not apply due to staleness",
1455                var_labels: ["shard", "name"],
1456            )),
1457            pubsub_push_diff_not_applied_out_of_order: registry.register(metric!(
1458                name: "mz_persist_shard_pubsub_diff_not_applied_out_of_order",
1459                help: "number of diffs received via pubsub that did not apply due to out-of-order delivery",
1460                var_labels: ["shard", "name"],
1461            )),
1462            stale_version: registry.register(metric!(
1463                name: "mz_persist_shard_stale_version",
1464                help: "indicates whether the current version of the shard is less than the current version of the code",
1465                var_labels: ["shard", "name"],
1466            )),
1467            blob_gets: registry.register(metric!(
1468                name: "mz_persist_shard_blob_gets",
1469                help: "number of Blob::get calls for this shard",
1470                var_labels: ["shard", "name"],
1471            )),
1472            blob_sets: registry.register(metric!(
1473                name: "mz_persist_shard_blob_sets",
1474                help: "number of Blob::set calls for this shard",
1475                var_labels: ["shard", "name"],
1476            )),
1477            live_writers: registry.register(metric!(
1478                name: "mz_persist_shard_live_writers",
1479                help: "number of writers that have recently appended updates to this shard",
1480                var_labels: ["shard", "name"],
1481            )),
1482            unconsolidated_snapshot: registry.register(metric!(
1483                name: "mz_persist_shard_unconsolidated_snapshot",
1484                help: "in snapshot_and_read, the number of times consolidating the raw data wasn't enough to produce consolidated output",
1485                var_labels: ["shard", "name"],
1486            )),
1487            backpressure_emitted_bytes: registry.register(metric!(
1488                name: "mz_persist_backpressure_emitted_bytes",
1489                help: "A counter with the number of emitted bytes.",
1490                var_labels: ["shard", "name"],
1491            )),
1492            backpressure_last_backpressured_bytes: registry.register(metric!(
1493                name: "mz_persist_backpressure_last_backpressured_bytes",
1494                help: "The last count of bytes we are waiting to be retired in \
1495                    the operator. This cannot be directly compared to \
1496                    `retired_bytes`, but CAN indicate that backpressure is happening.",
1497                var_labels: ["shard", "name"],
1498            )),
1499            backpressure_retired_bytes: registry.register(metric!(
1500                name: "mz_persist_backpressure_retired_bytes",
1501                help:"A counter with the number of bytes retired by downstream processing.",
1502                var_labels: ["shard", "name"],
1503            )),
1504            rewrite_part_count: registry.register(metric!(
1505                name: "mz_persist_shard_rewrite_part_count",
1506                help: "count of batch parts with rewrites by shard",
1507                var_labels: ["shard", "name"],
1508            )),
1509            inline_part_count: registry.register(metric!(
1510                name: "mz_persist_shard_inline_part_count",
1511                help: "count of parts inline in shard metadata",
1512                var_labels: ["shard", "name"],
1513            )),
1514            inline_part_bytes: registry.register(metric!(
1515                name: "mz_persist_shard_inline_part_bytes",
1516                help: "total size of parts inline in shard metadata",
1517                var_labels: ["shard", "name"],
1518            )),
1519            compact_batches: registry.register(metric!(
1520                name: "mz_persist_shard_compact_batches",
1521                help: "number of fully compact batches in the shard",
1522                var_labels: ["shard", "name"],
1523            )),
1524            compacting_batches: registry.register(metric!(
1525                name: "mz_persist_shard_compacting_batches",
1526                help: "number of batches in the shard with compactions in progress",
1527                var_labels: ["shard", "name"],
1528            )),
1529            noncompact_batches: registry.register(metric!(
1530                name: "mz_persist_shard_noncompact_batches",
1531                help: "number of batches in the shard that aren't compact and have no ongoing compaction",
1532                var_labels: ["shard", "name"],
1533            )),
1534            schema_registry_version_count: registry.register(metric!(
1535                name: "mz_persist_shard_schema_registry_version_count",
1536                help: "count of versions in the schema registry",
1537                var_labels: ["shard", "name"],
1538            )),
1539            inline_backpressure_count: registry.register(metric!(
1540                name: "mz_persist_shard_inline_backpressure_count",
1541                help: "count of CaA attempts retried because of inline backpressure",
1542                var_labels: ["shard", "name"],
1543            )),
1544            shards,
1545        }
1546    }
1547
1548    pub fn shard(&self, shard_id: &ShardId, name: &str) -> Arc<ShardMetrics> {
1549        let mut shards = self.shards.lock().expect("mutex poisoned");
1550        if let Some(shard) = shards.get(shard_id) {
1551            if let Some(shard) = shard.upgrade() {
1552                return Arc::clone(&shard);
1553            } else {
1554                assert!(shards.remove(shard_id).is_some());
1555            }
1556        }
1557        let shard = Arc::new(ShardMetrics::new(shard_id, name, self));
1558        assert!(
1559            shards
1560                .insert(shard_id.clone(), Arc::downgrade(&shard))
1561                .is_none()
1562        );
1563        shard
1564    }
1565
1566    fn compute<F: FnMut(&ShardMetrics)>(
1567        shards: &Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
1568        mut f: F,
1569    ) {
1570        let mut shards = shards.lock().expect("mutex poisoned");
1571        let mut deleted_shards = Vec::new();
1572        for (shard_id, metrics) in shards.iter() {
1573            if let Some(metrics) = metrics.upgrade() {
1574                f(&metrics);
1575            } else {
1576                deleted_shards.push(shard_id.clone());
1577            }
1578        }
1579        for deleted_shard_id in deleted_shards {
1580            assert!(shards.remove(&deleted_shard_id).is_some());
1581        }
1582    }
1583}
1584
1585#[derive(Debug)]
1586pub struct ShardMetrics {
1587    pub shard_id: ShardId,
1588    pub name: String,
1589    pub since: DeleteOnDropGauge<AtomicI64, Vec<String>>,
1590    pub upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
1591    pub largest_batch_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1592    pub latest_rollup_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1593    pub encoded_diff_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1594    pub hollow_batch_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1595    pub spine_batch_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1596    pub batch_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1597    batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
1598    batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
1599    batch_part_version_map: Mutex<BTreeMap<String, BatchPartVersionMetrics>>,
1600    pub update_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1601    pub rollup_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1602    pub seqnos_held: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1603    pub seqnos_since_last_rollup: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1604    pub gc_seqno_held_parts: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1605    pub gc_live_diffs: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1606    pub usage_current_state_batches_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1607    pub usage_current_state_rollups_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1608    pub usage_referenced_not_current_state_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1609    pub usage_not_leaked_not_referenced_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1610    pub usage_leaked_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1611    pub gc_finished: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1612    pub compaction_applied: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1613    pub cmd_succeeded: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1614    pub pubsub_push_diff_applied: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1615    pub pubsub_push_diff_not_applied_stale: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1616    pub pubsub_push_diff_not_applied_out_of_order: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1617    pub stale_version: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1618    pub blob_gets: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1619    pub blob_sets: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1620    pub live_writers: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1621    pub unconsolidated_snapshot: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1622    pub backpressure_emitted_bytes: Arc<DeleteOnDropCounter<AtomicU64, Vec<String>>>,
1623    pub backpressure_last_backpressured_bytes: Arc<DeleteOnDropGauge<AtomicU64, Vec<String>>>,
1624    pub backpressure_retired_bytes: Arc<DeleteOnDropCounter<AtomicU64, Vec<String>>>,
1625    pub rewrite_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1626    pub inline_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1627    pub inline_part_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1628    pub compact_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1629    pub compacting_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1630    pub noncompact_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1631    pub schema_registry_version_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1632    pub inline_backpressure_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1633}
1634
1635impl ShardMetrics {
1636    pub fn new(shard_id: &ShardId, name: &str, shards_metrics: &ShardsMetrics) -> Self {
1637        let shard = shard_id.to_string();
1638        ShardMetrics {
1639            shard_id: *shard_id,
1640            name: name.to_string(),
1641            since: shards_metrics
1642                .since
1643                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1644            upper: shards_metrics
1645                .upper
1646                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1647            latest_rollup_size: shards_metrics
1648                .encoded_rollup_size
1649                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1650            encoded_diff_size: shards_metrics
1651                .encoded_diff_size
1652                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1653            hollow_batch_count: shards_metrics
1654                .hollow_batch_count
1655                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1656            spine_batch_count: shards_metrics
1657                .spine_batch_count
1658                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1659            batch_part_count: shards_metrics
1660                .batch_part_count
1661                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1662            batch_part_version_count: shards_metrics.batch_part_version_count.clone(),
1663            batch_part_version_bytes: shards_metrics.batch_part_version_bytes.clone(),
1664            batch_part_version_map: Mutex::new(BTreeMap::new()),
1665            update_count: shards_metrics
1666                .update_count
1667                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1668            rollup_count: shards_metrics
1669                .rollup_count
1670                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1671            largest_batch_size: shards_metrics
1672                .largest_batch_size
1673                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1674            seqnos_held: shards_metrics
1675                .seqnos_held
1676                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1677            seqnos_since_last_rollup: shards_metrics
1678                .seqnos_since_last_rollup
1679                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1680            gc_seqno_held_parts: shards_metrics
1681                .gc_seqno_held_parts
1682                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1683            gc_live_diffs: shards_metrics
1684                .gc_live_diffs
1685                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1686            gc_finished: shards_metrics
1687                .gc_finished
1688                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1689            compaction_applied: shards_metrics
1690                .compaction_applied
1691                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1692            cmd_succeeded: shards_metrics
1693                .cmd_succeeded
1694                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1695            usage_current_state_batches_bytes: shards_metrics
1696                .usage_current_state_batches_bytes
1697                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1698            usage_current_state_rollups_bytes: shards_metrics
1699                .usage_current_state_rollups_bytes
1700                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1701            usage_referenced_not_current_state_bytes: shards_metrics
1702                .usage_referenced_not_current_state_bytes
1703                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1704            usage_not_leaked_not_referenced_bytes: shards_metrics
1705                .usage_not_leaked_not_referenced_bytes
1706                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1707            usage_leaked_bytes: shards_metrics
1708                .usage_leaked_bytes
1709                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1710            pubsub_push_diff_applied: shards_metrics
1711                .pubsub_push_diff_applied
1712                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1713            pubsub_push_diff_not_applied_stale: shards_metrics
1714                .pubsub_push_diff_not_applied_stale
1715                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1716            pubsub_push_diff_not_applied_out_of_order: shards_metrics
1717                .pubsub_push_diff_not_applied_out_of_order
1718                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1719            stale_version: shards_metrics
1720                .stale_version
1721                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1722            blob_gets: shards_metrics
1723                .blob_gets
1724                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1725            blob_sets: shards_metrics
1726                .blob_sets
1727                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1728            live_writers: shards_metrics
1729                .live_writers
1730                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1731            unconsolidated_snapshot: shards_metrics
1732                .unconsolidated_snapshot
1733                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1734            backpressure_emitted_bytes: Arc::new(
1735                shards_metrics
1736                    .backpressure_emitted_bytes
1737                    .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1738            ),
1739            backpressure_last_backpressured_bytes: Arc::new(
1740                shards_metrics
1741                    .backpressure_last_backpressured_bytes
1742                    .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1743            ),
1744            backpressure_retired_bytes: Arc::new(
1745                shards_metrics
1746                    .backpressure_retired_bytes
1747                    .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1748            ),
1749            rewrite_part_count: shards_metrics
1750                .rewrite_part_count
1751                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1752            inline_part_count: shards_metrics
1753                .inline_part_count
1754                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1755            inline_part_bytes: shards_metrics
1756                .inline_part_bytes
1757                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1758            compact_batches: shards_metrics
1759                .compact_batches
1760                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1761            compacting_batches: shards_metrics
1762                .compacting_batches
1763                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1764            noncompact_batches: shards_metrics
1765                .noncompact_batches
1766                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1767            schema_registry_version_count: shards_metrics
1768                .schema_registry_version_count
1769                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1770            inline_backpressure_count: shards_metrics
1771                .inline_backpressure_count
1772                .get_delete_on_drop_metric(vec![shard, name.to_string()]),
1773        }
1774    }
1775
1776    pub fn set_since<T: Codec64>(&self, since: &Antichain<T>) {
1777        self.since.set(encode_ts_metric(since))
1778    }
1779
1780    pub fn set_upper<T: Codec64>(&self, upper: &Antichain<T>) {
1781        self.upper.set(encode_ts_metric(upper))
1782    }
1783
1784    pub(crate) fn set_batch_part_versions<'a>(
1785        &self,
1786        batch_parts_by_version: impl Iterator<Item = (&'a str, usize)>,
1787    ) {
1788        let mut map = self
1789            .batch_part_version_map
1790            .lock()
1791            .expect("mutex should not be poisoned");
1792        // NB: It's a bit sus that the below assumes that no one else is
1793        // concurrently modifying the atomics in the gauges, but we're holding
1794        // the mutex this whole time, so it should be true.
1795
1796        // We want to do this in a way that avoids allocating (e.g. summing up a
1797        // map). First reset everything.
1798        for x in map.values() {
1799            x.batch_part_version_count.set(0);
1800            x.batch_part_version_bytes.set(0);
1801        }
1802
1803        // Then go through the iterator, creating new entries as necessary and
1804        // adding.
1805        for (key, bytes) in batch_parts_by_version {
1806            if !map.contains_key(key) {
1807                map.insert(
1808                    key.to_owned(),
1809                    BatchPartVersionMetrics {
1810                        batch_part_version_count: self
1811                            .batch_part_version_count
1812                            .get_delete_on_drop_metric(vec![
1813                                self.shard_id.to_string(),
1814                                self.name.clone(),
1815                                key.to_owned(),
1816                            ]),
1817                        batch_part_version_bytes: self
1818                            .batch_part_version_bytes
1819                            .get_delete_on_drop_metric(vec![
1820                                self.shard_id.to_string(),
1821                                self.name.clone(),
1822                                key.to_owned(),
1823                            ]),
1824                    },
1825                );
1826            }
1827            let value = map.get(key).expect("inserted above");
1828            value.batch_part_version_count.inc();
1829            value.batch_part_version_bytes.add(u64::cast_from(bytes));
1830        }
1831    }
1832}
1833
1834#[derive(Debug)]
1835pub struct BatchPartVersionMetrics {
1836    pub batch_part_version_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1837    pub batch_part_version_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1838}
1839
1840/// Metrics recorded by audits of persist usage
1841#[derive(Debug)]
1842pub struct UsageAuditMetrics {
1843    /// Size of all batch parts stored in Blob
1844    pub blob_batch_part_bytes: UIntGauge,
1845    /// Count of batch parts stored in Blob
1846    pub blob_batch_part_count: UIntGauge,
1847    /// Size of all state rollups stored in Blob
1848    pub blob_rollup_bytes: UIntGauge,
1849    /// Count of state rollups stored in Blob
1850    pub blob_rollup_count: UIntGauge,
1851    /// Size of Blob
1852    pub blob_bytes: UIntGauge,
1853    /// Count of all blobs
1854    pub blob_count: UIntGauge,
1855    /// Time spent fetching blob metadata
1856    pub step_blob_metadata: Counter,
1857    /// Time spent fetching state versions
1858    pub step_state: Counter,
1859    /// Time spent doing math
1860    pub step_math: Counter,
1861}
1862
1863impl UsageAuditMetrics {
1864    fn new(registry: &MetricsRegistry) -> Self {
1865        let step_timings: CounterVec = registry.register(metric!(
1866                name: "mz_persist_audit_step_seconds",
1867                help: "time spent on individual steps of audit",
1868                var_labels: ["step"],
1869        ));
1870        UsageAuditMetrics {
1871            blob_batch_part_bytes: registry.register(metric!(
1872                name: "mz_persist_audit_blob_batch_part_bytes",
1873                help: "total size of batch parts in blob",
1874            )),
1875            blob_batch_part_count: registry.register(metric!(
1876                name: "mz_persist_audit_blob_batch_part_count",
1877                help: "count of batch parts in blob",
1878            )),
1879            blob_rollup_bytes: registry.register(metric!(
1880                name: "mz_persist_audit_blob_rollup_bytes",
1881                help: "total size of state rollups stored in blob",
1882            )),
1883            blob_rollup_count: registry.register(metric!(
1884                name: "mz_persist_audit_blob_rollup_count",
1885                help: "count of all state rollups in blob",
1886            )),
1887            blob_bytes: registry.register(metric!(
1888                name: "mz_persist_audit_blob_bytes",
1889                help: "total size of blob",
1890            )),
1891            blob_count: registry.register(metric!(
1892                name: "mz_persist_audit_blob_count",
1893                help: "count of all blobs",
1894            )),
1895            step_blob_metadata: step_timings.with_label_values(&["blob_metadata"]),
1896            step_state: step_timings.with_label_values(&["state"]),
1897            step_math: step_timings.with_label_values(&["math"]),
1898        }
1899    }
1900}
1901
1902/// Represents a change in a number of updates kept in a data structure
1903/// (e.g., a buffer length or capacity change).
1904#[derive(Debug)]
1905pub enum UpdateDelta {
1906    /// A negative delta in the number of updates.
1907    Negative(u64),
1908    /// A non-negative delta in the number of updates.
1909    NonNegative(u64),
1910}
1911
1912impl UpdateDelta {
1913    /// Creates a new `UpdateDelta` from the difference between a new value
1914    /// for a number of updates and the corresponding old value.
1915    pub fn new(new: usize, old: usize) -> Self {
1916        if new < old {
1917            UpdateDelta::Negative(CastFrom::cast_from(old - new))
1918        } else {
1919            UpdateDelta::NonNegative(CastFrom::cast_from(new - old))
1920        }
1921    }
1922}
1923
1924/// Metrics for the persist sink. (While this lies slightly outside the usual
1925/// abstraction boundary of the client, it's convenient to manage them together.
1926#[derive(Debug, Clone)]
1927pub struct SinkMetrics {
1928    /// Cumulative record insertions made to the correction buffer across workers
1929    correction_insertions_total: IntCounter,
1930    /// Cumulative record deletions made to the correction buffer across workers
1931    correction_deletions_total: IntCounter,
1932    /// Cumulative capacity increases made to the correction buffer across workers
1933    correction_capacity_increases_total: IntCounter,
1934    /// Cumulative capacity decreases made to the correction buffer across workers
1935    correction_capacity_decreases_total: IntCounter,
1936    /// Maximum length observed for any one correction buffer per worker
1937    correction_max_per_sink_worker_len_updates: raw::UIntGaugeVec,
1938    /// Maximum capacity observed for any one correction buffer per worker
1939    correction_max_per_sink_worker_capacity_updates: raw::UIntGaugeVec,
1940}
1941
1942impl SinkMetrics {
1943    fn new(registry: &MetricsRegistry) -> Self {
1944        SinkMetrics {
1945            correction_insertions_total: registry.register(metric!(
1946                name: "mz_persist_sink_correction_insertions_total",
1947                help: "The cumulative insertions observed on the correction buffer across workers and persist sinks.",
1948            )),
1949            correction_deletions_total: registry.register(metric!(
1950                name: "mz_persist_sink_correction_deletions_total",
1951                help: "The cumulative deletions observed on the correction buffer across workers and persist sinks.",
1952            )),
1953            correction_capacity_increases_total: registry.register(metric!(
1954                name: "mz_persist_sink_correction_capacity_increases_total",
1955                help: "The cumulative capacity increases observed on the correction buffer across workers and persist sinks.",
1956            )),
1957            correction_capacity_decreases_total: registry.register(metric!(
1958                name: "mz_persist_sink_correction_capacity_decreases_total",
1959                help: "The cumulative capacity decreases observed on the correction buffer across workers and persist sinks.",
1960            )),
1961            correction_max_per_sink_worker_len_updates: registry.register(metric!(
1962                name: "mz_persist_sink_correction_max_per_sink_worker_len_updates",
1963                help: "The maximum length observed for the correction buffer of any single persist sink per worker.",
1964                var_labels: ["worker_id"],
1965            )),
1966            correction_max_per_sink_worker_capacity_updates: registry.register(metric!(
1967                name: "mz_persist_sink_correction_max_per_sink_worker_capacity_updates",
1968                help: "The maximum capacity observed for the correction buffer of any single persist sink per worker.",
1969                var_labels: ["worker_id"],
1970            )),
1971        }
1972    }
1973
1974    /// Obtains a `SinkWorkerMetrics` instance, which allows for metric reporting
1975    /// from a specific `persist_sink` instance for a given worker. The reports will
1976    /// update metrics shared across workers, but provide per-worker contributions
1977    /// to them.
1978    pub fn for_worker(&self, worker_id: usize) -> SinkWorkerMetrics {
1979        let worker = worker_id.to_string();
1980        let correction_max_per_sink_worker_len_updates = self
1981            .correction_max_per_sink_worker_len_updates
1982            .with_label_values(&[&worker]);
1983        let correction_max_per_sink_worker_capacity_updates = self
1984            .correction_max_per_sink_worker_capacity_updates
1985            .with_label_values(&[&worker]);
1986        SinkWorkerMetrics {
1987            correction_max_per_sink_worker_len_updates,
1988            correction_max_per_sink_worker_capacity_updates,
1989        }
1990    }
1991
1992    /// Reports updates to the length and capacity of the correction buffer in the
1993    /// `write_batches` operator of a `persist_sink`.
1994    ///
1995    /// This method updates monotonic metrics based on the deltas and thus can be
1996    /// called across workers and instances of `persist_sink`.
1997    pub fn report_correction_update_deltas(
1998        &self,
1999        correction_len_delta: UpdateDelta,
2000        correction_cap_delta: UpdateDelta,
2001    ) {
2002        // Report insertions or deletions.
2003        match correction_len_delta {
2004            UpdateDelta::NonNegative(delta) => {
2005                if delta > 0 {
2006                    self.correction_insertions_total.inc_by(delta)
2007                }
2008            }
2009            UpdateDelta::Negative(delta) => self.correction_deletions_total.inc_by(delta),
2010        }
2011        // Report capacity increases or decreases.
2012        match correction_cap_delta {
2013            UpdateDelta::NonNegative(delta) => {
2014                if delta > 0 {
2015                    self.correction_capacity_increases_total.inc_by(delta)
2016                }
2017            }
2018            UpdateDelta::Negative(delta) => self.correction_capacity_decreases_total.inc_by(delta),
2019        }
2020    }
2021}
2022
2023/// Metrics for the persist sink that are labeled per-worker.
2024#[derive(Clone, Debug)]
2025pub struct SinkWorkerMetrics {
2026    correction_max_per_sink_worker_len_updates: UIntGauge,
2027    correction_max_per_sink_worker_capacity_updates: UIntGauge,
2028}
2029
2030impl SinkWorkerMetrics {
2031    /// Reports the length and capacity of the correction buffer in the `write_batches`
2032    /// operator of `persist_sink`.
2033    ///
2034    /// This method is used to update metrics that are kept per worker.
2035    pub fn report_correction_update_totals(&self, correction_len: usize, correction_cap: usize) {
2036        // Maintain per-worker peaks.
2037        let correction_len = CastFrom::cast_from(correction_len);
2038        if correction_len > self.correction_max_per_sink_worker_len_updates.get() {
2039            self.correction_max_per_sink_worker_len_updates
2040                .set(correction_len);
2041        }
2042        let correction_cap = CastFrom::cast_from(correction_cap);
2043        if correction_cap > self.correction_max_per_sink_worker_capacity_updates.get() {
2044            self.correction_max_per_sink_worker_capacity_updates
2045                .set(correction_cap);
2046        }
2047    }
2048}
2049
2050/// A minimal set of metrics imported into honeycomb for alerting.
2051#[derive(Debug)]
2052pub struct AlertsMetrics {
2053    pub(crate) blob_failures: IntCounter,
2054    pub(crate) consensus_failures: IntCounter,
2055}
2056
2057impl AlertsMetrics {
2058    fn new(registry: &MetricsRegistry) -> Self {
2059        AlertsMetrics {
2060            blob_failures: registry.register(metric!(
2061                name: "mz_persist_blob_failures",
2062                help: "count of all blob operation failures",
2063                const_labels: {"honeycomb" => "import"},
2064            )),
2065            consensus_failures: registry.register(metric!(
2066                name: "mz_persist_consensus_failures",
2067                help: "count of determinate consensus operation failures",
2068                const_labels: {"honeycomb" => "import"},
2069            )),
2070        }
2071    }
2072}
2073
2074/// Metrics for the PubSubServer implementation.
2075#[derive(Debug)]
2076pub struct PubSubServerMetrics {
2077    pub(crate) active_connections: UIntGauge,
2078    pub(crate) broadcasted_diff_count: IntCounter,
2079    pub(crate) broadcasted_diff_bytes: IntCounter,
2080    pub(crate) broadcasted_diff_dropped_channel_full: IntCounter,
2081
2082    pub(crate) push_seconds: Counter,
2083    pub(crate) subscribe_seconds: Counter,
2084    pub(crate) unsubscribe_seconds: Counter,
2085    pub(crate) connection_cleanup_seconds: Counter,
2086
2087    pub(crate) push_call_count: IntCounter,
2088    pub(crate) subscribe_call_count: IntCounter,
2089    pub(crate) unsubscribe_call_count: IntCounter,
2090}
2091
2092impl PubSubServerMetrics {
2093    pub(crate) fn new(registry: &MetricsRegistry) -> Self {
2094        let op_timings: CounterVec = registry.register(metric!(
2095                name: "mz_persist_pubsub_server_operation_seconds",
2096                help: "time spent in pubsub server performing each operation",
2097                var_labels: ["op"],
2098        ));
2099        let call_count: IntCounterVec = registry.register(metric!(
2100                name: "mz_persist_pubsub_server_call_count",
2101                help: "count of each pubsub server message received",
2102                var_labels: ["call"],
2103        ));
2104
2105        Self {
2106            active_connections: registry.register(metric!(
2107                    name: "mz_persist_pubsub_server_active_connections",
2108                    help: "number of active connections to server",
2109            )),
2110            broadcasted_diff_count: registry.register(metric!(
2111                    name: "mz_persist_pubsub_server_broadcasted_diff_count",
2112                    help: "count of total broadcast diff messages sent",
2113            )),
2114            broadcasted_diff_bytes: registry.register(metric!(
2115                    name: "mz_persist_pubsub_server_broadcasted_diff_bytes",
2116                    help: "count of total broadcast diff bytes sent",
2117            )),
2118            broadcasted_diff_dropped_channel_full: registry.register(metric!(
2119                    name: "mz_persist_pubsub_server_broadcasted_diff_dropped_channel_full",
2120                    help: "count of diffs dropped due to full connection channel",
2121            )),
2122
2123            push_seconds: op_timings.with_label_values(&["push"]),
2124            subscribe_seconds: op_timings.with_label_values(&["subscribe"]),
2125            unsubscribe_seconds: op_timings.with_label_values(&["unsubscribe"]),
2126            connection_cleanup_seconds: op_timings.with_label_values(&["cleanup"]),
2127
2128            push_call_count: call_count.with_label_values(&["push"]),
2129            subscribe_call_count: call_count.with_label_values(&["subscribe"]),
2130            unsubscribe_call_count: call_count.with_label_values(&["unsubscribe"]),
2131        }
2132    }
2133}
2134
2135/// Metrics for the PubSubClient implementation.
2136#[derive(Debug)]
2137pub struct PubSubClientMetrics {
2138    pub sender: PubSubClientSenderMetrics,
2139    pub receiver: PubSubClientReceiverMetrics,
2140    pub grpc_connection: PubSubGrpcClientConnectionMetrics,
2141}
2142
2143impl PubSubClientMetrics {
2144    fn new(registry: &MetricsRegistry) -> Self {
2145        PubSubClientMetrics {
2146            sender: PubSubClientSenderMetrics::new(registry),
2147            receiver: PubSubClientReceiverMetrics::new(registry),
2148            grpc_connection: PubSubGrpcClientConnectionMetrics::new(registry),
2149        }
2150    }
2151}
2152
2153#[derive(Debug)]
2154pub struct PubSubGrpcClientConnectionMetrics {
2155    pub(crate) connected: UIntGauge,
2156    pub(crate) connection_established_count: IntCounter,
2157    pub(crate) connect_call_attempt_count: IntCounter,
2158    pub(crate) broadcast_recv_lagged_count: IntCounter,
2159    pub(crate) grpc_error_count: IntCounter,
2160}
2161
2162impl PubSubGrpcClientConnectionMetrics {
2163    fn new(registry: &MetricsRegistry) -> Self {
2164        Self {
2165            connected: registry.register(metric!(
2166                    name: "mz_persist_pubsub_client_grpc_connected",
2167                    help: "whether the grpc client is currently connected",
2168            )),
2169            connection_established_count: registry.register(metric!(
2170                    name: "mz_persist_pubsub_client_grpc_connection_established_count",
2171                    help: "count of grpc connection establishments to pubsub server",
2172            )),
2173            connect_call_attempt_count: registry.register(metric!(
2174                    name: "mz_persist_pubsub_client_grpc_connect_call_attempt_count",
2175                    help: "count of connection call attempts (including retries) to pubsub server",
2176            )),
2177            broadcast_recv_lagged_count: registry.register(metric!(
2178                    name: "mz_persist_pubsub_client_grpc_broadcast_recv_lagged_count",
2179                    help: "times a message was missed by broadcast receiver due to lag",
2180            )),
2181            grpc_error_count: registry.register(metric!(
2182                    name: "mz_persist_pubsub_client_grpc_error_count",
2183                    help: "count of grpc errors received",
2184            )),
2185        }
2186    }
2187}
2188
2189#[derive(Clone, Debug)]
2190pub struct PubSubClientReceiverMetrics {
2191    pub(crate) push_received: IntCounter,
2192    pub(crate) unknown_message_received: IntCounter,
2193    pub(crate) approx_diff_latency_seconds: Histogram,
2194
2195    pub(crate) state_pushed_diff_fast_path: IntCounter,
2196    pub(crate) state_pushed_diff_slow_path_succeeded: IntCounter,
2197    pub(crate) state_pushed_diff_slow_path_failed: IntCounter,
2198}
2199
2200impl PubSubClientReceiverMetrics {
2201    fn new(registry: &MetricsRegistry) -> Self {
2202        let call_received: IntCounterVec = registry.register(metric!(
2203                name: "mz_persist_pubsub_client_call_received",
2204                help: "times a pubsub client call was received",
2205                var_labels: ["call"],
2206        ));
2207
2208        Self {
2209            push_received: call_received.with_label_values(&["push"]),
2210            unknown_message_received: call_received.with_label_values(&["unknown"]),
2211            approx_diff_latency_seconds: registry.register(metric!(
2212                name: "mz_persist_pubsub_client_approx_diff_apply_latency_seconds",
2213                help: "histogram of (approximate) latency between sending a diff and applying it",
2214                buckets: prometheus::exponential_buckets(0.001, 2.0, 13).expect("buckets"),
2215            )),
2216
2217            state_pushed_diff_fast_path: registry.register(metric!(
2218                name: "mz_persist_pubsub_client_receiver_state_push_diff_fast_path",
2219                help: "count fast-path state push_diff calls",
2220            )),
2221            state_pushed_diff_slow_path_succeeded: registry.register(metric!(
2222                name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_succeeded",
2223                help: "count of successful slow-path state push_diff calls",
2224            )),
2225            state_pushed_diff_slow_path_failed: registry.register(metric!(
2226                name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_failed",
2227                help: "count of unsuccessful slow-path state push_diff calls",
2228            )),
2229        }
2230    }
2231}
2232
2233#[derive(Debug)]
2234pub struct PubSubClientSenderMetrics {
2235    pub push: PubSubClientCallMetrics,
2236    pub subscribe: PubSubClientCallMetrics,
2237    pub unsubscribe: PubSubClientCallMetrics,
2238}
2239
2240#[derive(Debug)]
2241pub struct PubSubClientCallMetrics {
2242    pub(crate) succeeded: IntCounter,
2243    pub(crate) bytes_sent: IntCounter,
2244    pub(crate) failed: IntCounter,
2245}
2246
2247impl PubSubClientSenderMetrics {
2248    fn new(registry: &MetricsRegistry) -> Self {
2249        let call_bytes_sent: IntCounterVec = registry.register(metric!(
2250                name: "mz_persist_pubsub_client_call_bytes_sent",
2251                help: "number of bytes sent for a given pubsub client call",
2252                var_labels: ["call"],
2253        ));
2254        let call_succeeded: IntCounterVec = registry.register(metric!(
2255                name: "mz_persist_pubsub_client_call_succeeded",
2256                help: "times a pubsub client call succeeded",
2257                var_labels: ["call"],
2258        ));
2259        let call_failed: IntCounterVec = registry.register(metric!(
2260                name: "mz_persist_pubsub_client_call_failed",
2261                help: "times a pubsub client call failed",
2262                var_labels: ["call"],
2263        ));
2264
2265        Self {
2266            push: PubSubClientCallMetrics {
2267                succeeded: call_succeeded.with_label_values(&["push"]),
2268                failed: call_failed.with_label_values(&["push"]),
2269                bytes_sent: call_bytes_sent.with_label_values(&["push"]),
2270            },
2271            subscribe: PubSubClientCallMetrics {
2272                succeeded: call_succeeded.with_label_values(&["subscribe"]),
2273                failed: call_failed.with_label_values(&["subscribe"]),
2274                bytes_sent: call_bytes_sent.with_label_values(&["subscribe"]),
2275            },
2276            unsubscribe: PubSubClientCallMetrics {
2277                succeeded: call_succeeded.with_label_values(&["unsubscribe"]),
2278                failed: call_failed.with_label_values(&["unsubscribe"]),
2279                bytes_sent: call_bytes_sent.with_label_values(&["unsubscribe"]),
2280            },
2281        }
2282    }
2283}
2284
2285#[derive(Debug)]
2286pub struct LocksMetrics {
2287    pub(crate) applier_read_cacheable: LockMetrics,
2288    pub(crate) applier_read_noncacheable: LockMetrics,
2289    pub(crate) applier_write: LockMetrics,
2290    pub(crate) watch: LockMetrics,
2291}
2292
2293#[derive(Debug, Clone)]
2294pub struct LockMetrics {
2295    pub(crate) acquire_count: IntCounter,
2296    pub(crate) blocking_acquire_count: IntCounter,
2297    pub(crate) blocking_seconds: Counter,
2298}
2299
2300#[derive(Debug)]
2301pub struct WatchMetrics {
2302    pub(crate) listen_woken_via_watch: IntCounter,
2303    pub(crate) listen_woken_via_sleep: IntCounter,
2304    pub(crate) listen_resolved_via_watch: IntCounter,
2305    pub(crate) listen_resolved_via_sleep: IntCounter,
2306    pub(crate) snapshot_woken_via_watch: IntCounter,
2307    pub(crate) snapshot_woken_via_sleep: IntCounter,
2308    pub(crate) notify_sent: IntCounter,
2309    pub(crate) notify_noop: IntCounter,
2310    pub(crate) notify_recv: IntCounter,
2311    pub(crate) notify_lagged: IntCounter,
2312    pub(crate) notify_wait_started: IntCounter,
2313    pub(crate) notify_wait_finished: IntCounter,
2314}
2315
2316impl WatchMetrics {
2317    fn new(registry: &MetricsRegistry) -> Self {
2318        WatchMetrics {
2319            listen_woken_via_watch: registry.register(metric!(
2320                name: "mz_persist_listen_woken_via_watch",
2321                help: "count of listen next batches wakes via watch notify",
2322            )),
2323            listen_woken_via_sleep: registry.register(metric!(
2324                name: "mz_persist_listen_woken_via_sleep",
2325                help: "count of listen next batches wakes via sleep",
2326            )),
2327            listen_resolved_via_watch: registry.register(metric!(
2328                name: "mz_persist_listen_resolved_via_watch",
2329                help: "count of listen next batches resolved via watch notify",
2330            )),
2331            listen_resolved_via_sleep: registry.register(metric!(
2332                name: "mz_persist_listen_resolved_via_sleep",
2333                help: "count of listen next batches resolved via sleep",
2334            )),
2335            snapshot_woken_via_watch: registry.register(metric!(
2336                name: "mz_persist_snapshot_woken_via_watch",
2337                help: "count of snapshot wakes via watch notify",
2338            )),
2339            snapshot_woken_via_sleep: registry.register(metric!(
2340                name: "mz_persist_snapshot_woken_via_sleep",
2341                help: "count of snapshot wakes via sleep",
2342            )),
2343            notify_sent: registry.register(metric!(
2344                name: "mz_persist_watch_notify_sent",
2345                help: "count of watch notifications sent to a non-empty broadcast channel",
2346            )),
2347            notify_noop: registry.register(metric!(
2348                name: "mz_persist_watch_notify_noop",
2349                help: "count of watch notifications sent to an broadcast channel",
2350            )),
2351            notify_recv: registry.register(metric!(
2352                name: "mz_persist_watch_notify_recv",
2353                help: "count of watch notifications received from the broadcast channel",
2354            )),
2355            notify_lagged: registry.register(metric!(
2356                name: "mz_persist_watch_notify_lagged",
2357                help: "count of lagged events in the watch notification broadcast channel",
2358            )),
2359            notify_wait_started: registry.register(metric!(
2360                name: "mz_persist_watch_notify_wait_started",
2361                help: "count of watch wait calls started",
2362            )),
2363            notify_wait_finished: registry.register(metric!(
2364                name: "mz_persist_watch_notify_wait_finished",
2365                help: "count of watch wait calls resolved",
2366            )),
2367        }
2368    }
2369}
2370
2371#[derive(Debug)]
2372pub struct PushdownMetrics {
2373    pub(crate) parts_filtered_count: IntCounter,
2374    pub(crate) parts_filtered_bytes: IntCounter,
2375    pub(crate) parts_fetched_count: IntCounter,
2376    pub(crate) parts_fetched_bytes: IntCounter,
2377    pub(crate) parts_audited_count: IntCounter,
2378    pub(crate) parts_audited_bytes: IntCounter,
2379    pub(crate) parts_inline_count: IntCounter,
2380    pub(crate) parts_inline_bytes: IntCounter,
2381    pub(crate) parts_faked_count: IntCounter,
2382    pub(crate) parts_faked_bytes: IntCounter,
2383    pub(crate) parts_stats_trimmed_count: IntCounter,
2384    pub(crate) parts_stats_trimmed_bytes: IntCounter,
2385    pub(crate) parts_projection_trimmed_bytes: IntCounter,
2386    pub part_stats: PartStatsMetrics,
2387}
2388
2389impl PushdownMetrics {
2390    fn new(registry: &MetricsRegistry) -> Self {
2391        PushdownMetrics {
2392            parts_filtered_count: registry.register(metric!(
2393                name: "mz_persist_pushdown_parts_filtered_count",
2394                help: "count of parts filtered by pushdown",
2395            )),
2396            parts_filtered_bytes: registry.register(metric!(
2397                name: "mz_persist_pushdown_parts_filtered_bytes",
2398                help: "total size of parts filtered by pushdown in bytes",
2399            )),
2400            parts_fetched_count: registry.register(metric!(
2401                name: "mz_persist_pushdown_parts_fetched_count",
2402                help: "count of parts not filtered by pushdown",
2403            )),
2404            parts_fetched_bytes: registry.register(metric!(
2405                name: "mz_persist_pushdown_parts_fetched_bytes",
2406                help: "total size of parts not filtered by pushdown in bytes",
2407            )),
2408            parts_audited_count: registry.register(metric!(
2409                name: "mz_persist_pushdown_parts_audited_count",
2410                help: "count of parts fetched only for pushdown audit",
2411            )),
2412            parts_audited_bytes: registry.register(metric!(
2413                name: "mz_persist_pushdown_parts_audited_bytes",
2414                help: "total size of parts fetched only for pushdown audit",
2415            )),
2416            parts_inline_count: registry.register(metric!(
2417                name: "mz_persist_pushdown_parts_inline_count",
2418                help: "count of parts not fetched because they were inline",
2419            )),
2420            parts_inline_bytes: registry.register(metric!(
2421                name: "mz_persist_pushdown_parts_inline_bytes",
2422                help: "total size of parts not fetched because they were inline",
2423            )),
2424            parts_faked_count: registry.register(metric!(
2425                name: "mz_persist_pushdown_parts_faked_count",
2426                help: "count of parts faked because of aggressive projection pushdown",
2427            )),
2428            parts_faked_bytes: registry.register(metric!(
2429                name: "mz_persist_pushdown_parts_faked_bytes",
2430                help: "total size of parts replaced with fakes by aggressive projection pushdown",
2431            )),
2432            parts_stats_trimmed_count: registry.register(metric!(
2433                name: "mz_persist_pushdown_parts_stats_trimmed_count",
2434                help: "count of trimmed part stats",
2435            )),
2436            parts_stats_trimmed_bytes: registry.register(metric!(
2437                name: "mz_persist_pushdown_parts_stats_trimmed_bytes",
2438                help: "total bytes trimmed from part stats",
2439            )),
2440            parts_projection_trimmed_bytes: registry.register(metric!(
2441                name: "mz_persist_pushdown_parts_projection_trimmed_bytes",
2442                help: "total bytes trimmed from columnar data because of projection pushdown",
2443            )),
2444            part_stats: PartStatsMetrics::new(registry),
2445        }
2446    }
2447}
2448
2449#[derive(Debug)]
2450pub struct ConsolidationMetrics {
2451    pub(crate) parts_fetched: IntCounter,
2452    pub(crate) parts_skipped: IntCounter,
2453    pub(crate) parts_wasted: IntCounter,
2454    pub(crate) wrong_sort: IntCounter,
2455}
2456
2457impl ConsolidationMetrics {
2458    fn new(registry: &MetricsRegistry) -> Self {
2459        ConsolidationMetrics {
2460            parts_fetched: registry.register(metric!(
2461                name: "mz_persist_consolidation_parts_fetched_count",
2462                help: "count of parts that were fetched and used during consolidation",
2463            )),
2464            parts_skipped: registry.register(metric!(
2465                name: "mz_persist_consolidation_parts_skipped_count",
2466                help: "count of parts that were never needed during consolidation",
2467            )),
2468            parts_wasted: registry.register(metric!(
2469                name: "mz_persist_consolidation_parts_wasted_count",
2470                help: "count of parts that were fetched but not needed during consolidation",
2471            )),
2472            wrong_sort: registry.register(metric!(
2473                name: "mz_persist_consolidation_wrong_sort_count",
2474                help: "count of runs that were sorted using the wrong ordering for the current consolidation",
2475            )),
2476        }
2477    }
2478}
2479
2480#[derive(Debug)]
2481pub struct BlobMemCache {
2482    pub(crate) size_blobs: UIntGauge,
2483    pub(crate) size_bytes: UIntGauge,
2484    pub(crate) hits_blobs: IntCounter,
2485    pub(crate) hits_bytes: IntCounter,
2486    pub(crate) evictions: IntCounter,
2487}
2488
2489impl BlobMemCache {
2490    fn new(registry: &MetricsRegistry) -> Self {
2491        BlobMemCache {
2492            size_blobs: registry.register(metric!(
2493                name: "mz_persist_blob_cache_size_blobs",
2494                help: "count of blobs in the cache",
2495                const_labels: {"cache" => "mem"},
2496            )),
2497            size_bytes: registry.register(metric!(
2498                name: "mz_persist_blob_cache_size_bytes",
2499                help: "total size of blobs in the cache",
2500                const_labels: {"cache" => "mem"},
2501            )),
2502            hits_blobs: registry.register(metric!(
2503                name: "mz_persist_blob_cache_hits_blobs",
2504                help: "count of blobs served via cache instead of s3",
2505                const_labels: {"cache" => "mem"},
2506            )),
2507            hits_bytes: registry.register(metric!(
2508                name: "mz_persist_blob_cache_hits_bytes",
2509                help: "total size of blobs served via cache instead of s3",
2510                const_labels: {"cache" => "mem"},
2511            )),
2512            evictions: registry.register(metric!(
2513                name: "mz_persist_blob_cache_evictions",
2514                help: "count of capacity-based cache evictions",
2515                const_labels: {"cache" => "mem"},
2516            )),
2517        }
2518    }
2519}
2520
2521#[derive(Debug)]
2522pub struct SemaphoreMetrics {
2523    cfg: PersistConfig,
2524    registry: MetricsRegistry,
2525    fetch: OnceCell<MetricsSemaphore>,
2526}
2527
2528impl SemaphoreMetrics {
2529    fn new(cfg: PersistConfig, registry: MetricsRegistry) -> Self {
2530        SemaphoreMetrics {
2531            cfg,
2532            registry,
2533            fetch: OnceCell::new(),
2534        }
2535    }
2536
2537    /// We can't easily change the number of permits, and the dyncfgs are all
2538    /// set to defaults on process start, so make sure we only initialize the
2539    /// semaphore once we've synced dyncfgs at least once.
2540    async fn fetch(&self) -> &MetricsSemaphore {
2541        if let Some(x) = self.fetch.get() {
2542            // Common case of already initialized avoids the cloning below.
2543            return x;
2544        }
2545        let cfg = self.cfg.clone();
2546        let registry = self.registry.clone();
2547        let init = async move {
2548            let total_permits = match cfg.announce_memory_limit {
2549                // Non-cc replicas have the old physical flow control mechanism,
2550                // so only apply this one on cc replicas.
2551                Some(mem) if cfg.is_cc_active => {
2552                    // We can't easily adjust the number of permits later, so
2553                    // make sure we've synced dyncfg values at least once.
2554                    info!("fetch semaphore awaiting first dyncfg values");
2555                    let () = cfg.configs_synced_once().await;
2556                    let total_permits = usize::cast_lossy(
2557                        f64::cast_lossy(mem) * FETCH_SEMAPHORE_PERMIT_ADJUSTMENT.get(&cfg),
2558                    );
2559                    info!("fetch_semaphore got first dyncfg values");
2560                    total_permits
2561                }
2562                Some(_) | None => Semaphore::MAX_PERMITS,
2563            };
2564            MetricsSemaphore::new(&registry, "fetch", total_permits)
2565        };
2566        self.fetch.get_or_init(|| init).await
2567    }
2568
2569    pub(crate) async fn acquire_fetch_permits(&self, encoded_size_bytes: usize) -> MetricsPermits {
2570        // Adjust the requested permits to account for the difference between
2571        // encoded_size_bytes and the decoded size in lgalloc.
2572        let requested_permits = f64::cast_lossy(encoded_size_bytes);
2573        let requested_permits = requested_permits * FETCH_SEMAPHORE_COST_ADJUSTMENT.get(&self.cfg);
2574        let requested_permits = usize::cast_lossy(requested_permits);
2575        self.fetch().await.acquire_permits(requested_permits).await
2576    }
2577}
2578
2579#[derive(Debug)]
2580pub struct MetricsSemaphore {
2581    name: &'static str,
2582    semaphore: Arc<Semaphore>,
2583    total_permits: usize,
2584    acquire_count: IntCounter,
2585    blocking_count: IntCounter,
2586    blocking_seconds: Counter,
2587    acquired_permits: IntCounter,
2588    released_permits: IntCounter,
2589    _available_permits: ComputedUIntGauge,
2590}
2591
2592impl MetricsSemaphore {
2593    pub fn new(registry: &MetricsRegistry, name: &'static str, total_permits: usize) -> Self {
2594        let total_permits = std::cmp::min(total_permits, Semaphore::MAX_PERMITS);
2595        // TODO: Sadly, tokio::sync::Semaphore makes it difficult to have a
2596        // dynamic total_permits count.
2597        let semaphore = Arc::new(Semaphore::new(total_permits));
2598        MetricsSemaphore {
2599            name,
2600            total_permits,
2601            acquire_count: registry.register(metric!(
2602                name: "mz_persist_semaphore_acquire_count",
2603                help: "count of acquire calls (not acquired permits count)",
2604                const_labels: {"name" => name},
2605            )),
2606            blocking_count: registry.register(metric!(
2607                name: "mz_persist_semaphore_blocking_count",
2608                help: "count of acquire calls that had to block",
2609                const_labels: {"name" => name},
2610            )),
2611            blocking_seconds: registry.register(metric!(
2612                name: "mz_persist_semaphore_blocking_seconds",
2613                help: "total time spent blocking on permit acquisition",
2614                const_labels: {"name" => name},
2615            )),
2616            acquired_permits: registry.register(metric!(
2617                name: "mz_persist_semaphore_acquired_permits",
2618                help: "total sum of acquired permits",
2619                const_labels: {"name" => name},
2620            )),
2621            released_permits: registry.register(metric!(
2622                name: "mz_persist_semaphore_released_permits",
2623                help: "total sum of released permits",
2624                const_labels: {"name" => name},
2625            )),
2626            _available_permits: registry.register_computed_gauge(
2627                metric!(
2628                    name: "mz_persist_semaphore_available_permits",
2629                    help: "currently available permits according to the semaphore",
2630                ),
2631                {
2632                    let semaphore = Arc::clone(&semaphore);
2633                    move || u64::cast_from(semaphore.available_permits())
2634                },
2635            ),
2636            semaphore,
2637        }
2638    }
2639
2640    pub async fn acquire_permits(&self, requested_permits: usize) -> MetricsPermits {
2641        // HACK: Cap the request at the total permit count. This prevents
2642        // deadlock, even if the cfg gets set to some small value.
2643        let total_permits = u32::try_from(self.total_permits).unwrap_or(u32::MAX);
2644        let requested_permits = u32::try_from(requested_permits).unwrap_or(u32::MAX);
2645        let requested_permits = std::cmp::min(requested_permits, total_permits);
2646        let wrap = |_permit| {
2647            self.acquired_permits.inc_by(u64::from(requested_permits));
2648            MetricsPermits {
2649                _permit,
2650                released_metric: self.released_permits.clone(),
2651                count: requested_permits,
2652            }
2653        };
2654
2655        // Special-case non-blocking happy path.
2656        self.acquire_count.inc();
2657        match Arc::clone(&self.semaphore).try_acquire_many_owned(requested_permits) {
2658            Ok(x) => return wrap(x),
2659            Err(_) => {}
2660        };
2661
2662        // Sad path, gotta block.
2663        self.blocking_count.inc();
2664        let start = Instant::now();
2665        let ret = Arc::clone(&self.semaphore)
2666            .acquire_many_owned(requested_permits)
2667            .instrument(info_span!("acquire_permits"))
2668            .await;
2669        let elapsed = start.elapsed();
2670        self.blocking_seconds.inc_by(elapsed.as_secs_f64());
2671        debug!(
2672            "acquisition of {} {} permits blocked for {:?}",
2673            self.name, requested_permits, elapsed
2674        );
2675        wrap(ret.expect("semaphore is never closed"))
2676    }
2677}
2678
2679#[derive(Debug)]
2680pub struct MetricsPermits {
2681    _permit: OwnedSemaphorePermit,
2682    released_metric: IntCounter,
2683    count: u32,
2684}
2685
2686impl Drop for MetricsPermits {
2687    fn drop(&mut self) {
2688        self.released_metric.inc_by(u64::from(self.count))
2689    }
2690}
2691
2692#[derive(Debug)]
2693pub struct ExternalOpMetrics {
2694    started: IntCounter,
2695    succeeded: IntCounter,
2696    failed: IntCounter,
2697    bytes: IntCounter,
2698    seconds: Counter,
2699    seconds_histogram: Option<Histogram>,
2700    alerts_metrics: Arc<AlertsMetrics>,
2701}
2702
2703impl ExternalOpMetrics {
2704    async fn run_op<R, F, OpFn, ErrFn>(
2705        &self,
2706        op_fn: OpFn,
2707        on_err_fn: ErrFn,
2708    ) -> Result<R, ExternalError>
2709    where
2710        F: std::future::Future<Output = Result<R, ExternalError>>,
2711        OpFn: FnOnce() -> F,
2712        ErrFn: FnOnce(&AlertsMetrics, &ExternalError),
2713    {
2714        self.started.inc();
2715        let start = Instant::now();
2716        let res = op_fn().await;
2717        let elapsed_seconds = start.elapsed().as_secs_f64();
2718        self.seconds.inc_by(elapsed_seconds);
2719        if let Some(h) = &self.seconds_histogram {
2720            h.observe(elapsed_seconds);
2721        }
2722        match res.as_ref() {
2723            Ok(_) => self.succeeded.inc(),
2724            Err(err) => {
2725                self.failed.inc();
2726                on_err_fn(&self.alerts_metrics, err);
2727            }
2728        };
2729        res
2730    }
2731
2732    fn run_stream<'a, R: 'a, S, OpFn, ErrFn>(
2733        &'a self,
2734        op_fn: OpFn,
2735        mut on_err_fn: ErrFn,
2736    ) -> impl futures::Stream<Item = Result<R, ExternalError>> + 'a
2737    where
2738        S: futures::Stream<Item = Result<R, ExternalError>> + Unpin + 'a,
2739        OpFn: FnOnce() -> S,
2740        ErrFn: FnMut(&AlertsMetrics, &ExternalError) + 'a,
2741    {
2742        self.started.inc();
2743        let start = Instant::now();
2744        let mut stream = op_fn();
2745        stream! {
2746            let mut succeeded = true;
2747            while let Some(res) = stream.next().await {
2748                if let Err(err) = res.as_ref() {
2749                    on_err_fn(&self.alerts_metrics, err);
2750                    succeeded = false;
2751                }
2752                yield res;
2753            }
2754            if succeeded {
2755                self.succeeded.inc()
2756            } else {
2757                self.failed.inc()
2758            }
2759            let elapsed_seconds = start.elapsed().as_secs_f64();
2760            self.seconds.inc_by(elapsed_seconds);
2761            if let Some(h) = &self.seconds_histogram {
2762                h.observe(elapsed_seconds);
2763            }
2764        }
2765    }
2766}
2767
2768#[derive(Debug)]
2769pub struct BlobMetrics {
2770    set: ExternalOpMetrics,
2771    get: ExternalOpMetrics,
2772    list_keys: ExternalOpMetrics,
2773    delete: ExternalOpMetrics,
2774    restore: ExternalOpMetrics,
2775    delete_noop: IntCounter,
2776    blob_sizes: Histogram,
2777    pub rtt_latency: Gauge,
2778}
2779
2780#[derive(Debug)]
2781pub struct MetricsBlob {
2782    blob: Arc<dyn Blob>,
2783    metrics: Arc<Metrics>,
2784}
2785
2786impl MetricsBlob {
2787    pub fn new(blob: Arc<dyn Blob>, metrics: Arc<Metrics>) -> Self {
2788        MetricsBlob { blob, metrics }
2789    }
2790
2791    fn on_err(alerts_metrics: &AlertsMetrics, _err: &ExternalError) {
2792        alerts_metrics.blob_failures.inc()
2793    }
2794}
2795
2796#[async_trait]
2797impl Blob for MetricsBlob {
2798    #[instrument(name = "blob::get", fields(shard=blob_key_shard_id(key)))]
2799    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
2800        let res = self
2801            .metrics
2802            .blob
2803            .get
2804            .run_op(|| self.blob.get(key), Self::on_err)
2805            .await;
2806        if let Ok(Some(value)) = res.as_ref() {
2807            self.metrics
2808                .blob
2809                .get
2810                .bytes
2811                .inc_by(u64::cast_from(value.len()));
2812        }
2813        res
2814    }
2815
2816    #[instrument(name = "blob::list_keys_and_metadata", fields(shard=blob_key_shard_id(key_prefix)))]
2817    async fn list_keys_and_metadata(
2818        &self,
2819        key_prefix: &str,
2820        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
2821    ) -> Result<(), ExternalError> {
2822        let mut byte_total = 0;
2823        let mut instrumented = |blob_metadata: BlobMetadata| {
2824            // Track the size of the _keys_, not the blobs, so that we get a
2825            // sense for how much network bandwidth these calls are using.
2826            byte_total += blob_metadata.key.len();
2827            f(blob_metadata)
2828        };
2829
2830        let res = self
2831            .metrics
2832            .blob
2833            .list_keys
2834            .run_op(
2835                || {
2836                    self.blob
2837                        .list_keys_and_metadata(key_prefix, &mut instrumented)
2838                },
2839                Self::on_err,
2840            )
2841            .await;
2842
2843        self.metrics
2844            .blob
2845            .list_keys
2846            .bytes
2847            .inc_by(u64::cast_from(byte_total));
2848
2849        res
2850    }
2851
2852    #[instrument(name = "blob::set", fields(shard=blob_key_shard_id(key),size_bytes=value.len()))]
2853    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
2854        let bytes = value.len();
2855        let res = self
2856            .metrics
2857            .blob
2858            .set
2859            .run_op(|| self.blob.set(key, value), Self::on_err)
2860            .await;
2861        if res.is_ok() {
2862            self.metrics.blob.set.bytes.inc_by(u64::cast_from(bytes));
2863            self.metrics.blob.blob_sizes.observe(f64::cast_lossy(bytes));
2864        }
2865        res
2866    }
2867
2868    #[instrument(name = "blob::delete", fields(shard=blob_key_shard_id(key)))]
2869    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
2870        let bytes = self
2871            .metrics
2872            .blob
2873            .delete
2874            .run_op(|| self.blob.delete(key), Self::on_err)
2875            .await?;
2876        if let Some(bytes) = bytes {
2877            self.metrics.blob.delete.bytes.inc_by(u64::cast_from(bytes));
2878        } else {
2879            self.metrics.blob.delete_noop.inc();
2880        }
2881        Ok(bytes)
2882    }
2883
2884    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
2885        self.metrics
2886            .blob
2887            .restore
2888            .run_op(|| self.blob.restore(key), Self::on_err)
2889            .await
2890    }
2891}
2892
2893#[derive(Debug)]
2894pub struct ConsensusMetrics {
2895    list_keys: ExternalOpMetrics,
2896    head: ExternalOpMetrics,
2897    compare_and_set: ExternalOpMetrics,
2898    scan: ExternalOpMetrics,
2899    truncate: ExternalOpMetrics,
2900    truncated_count: IntCounter,
2901    pub rtt_latency: Gauge,
2902}
2903
2904#[derive(Debug)]
2905pub struct MetricsConsensus {
2906    consensus: Arc<dyn Consensus>,
2907    metrics: Arc<Metrics>,
2908}
2909
2910impl MetricsConsensus {
2911    pub fn new(consensus: Arc<dyn Consensus>, metrics: Arc<Metrics>) -> Self {
2912        MetricsConsensus { consensus, metrics }
2913    }
2914
2915    fn on_err(alerts_metrics: &AlertsMetrics, err: &ExternalError) {
2916        // As of 2022-09-06, regular determinate errors are expected in
2917        // Consensus (i.e. "txn conflict, please retry"), so only count the
2918        // indeterminate ones.
2919        if let ExternalError::Indeterminate(_) = err {
2920            alerts_metrics.consensus_failures.inc()
2921        }
2922    }
2923}
2924
2925#[async_trait]
2926impl Consensus for MetricsConsensus {
2927    fn list_keys(&self) -> ResultStream<'_, String> {
2928        Box::pin(
2929            self.metrics
2930                .consensus
2931                .list_keys
2932                .run_stream(|| self.consensus.list_keys(), Self::on_err),
2933        )
2934    }
2935
2936    #[instrument(name = "consensus::head", fields(shard=key))]
2937    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
2938        let res = self
2939            .metrics
2940            .consensus
2941            .head
2942            .run_op(|| self.consensus.head(key), Self::on_err)
2943            .await;
2944        if let Ok(Some(data)) = res.as_ref() {
2945            self.metrics
2946                .consensus
2947                .head
2948                .bytes
2949                .inc_by(u64::cast_from(data.data.len()));
2950        }
2951        res
2952    }
2953
2954    #[instrument(name = "consensus::compare_and_set", fields(shard=key,size_bytes=new.data.len()))]
2955    async fn compare_and_set(
2956        &self,
2957        key: &str,
2958        expected: Option<SeqNo>,
2959        new: VersionedData,
2960    ) -> Result<CaSResult, ExternalError> {
2961        let bytes = new.data.len();
2962        let res = self
2963            .metrics
2964            .consensus
2965            .compare_and_set
2966            .run_op(
2967                || self.consensus.compare_and_set(key, expected, new),
2968                Self::on_err,
2969            )
2970            .await;
2971        match res.as_ref() {
2972            Ok(CaSResult::Committed) => self
2973                .metrics
2974                .consensus
2975                .compare_and_set
2976                .bytes
2977                .inc_by(u64::cast_from(bytes)),
2978            Ok(CaSResult::ExpectationMismatch) | Err(_) => {}
2979        }
2980        res
2981    }
2982
2983    #[instrument(name = "consensus::scan", fields(shard=key))]
2984    async fn scan(
2985        &self,
2986        key: &str,
2987        from: SeqNo,
2988        limit: usize,
2989    ) -> Result<Vec<VersionedData>, ExternalError> {
2990        let res = self
2991            .metrics
2992            .consensus
2993            .scan
2994            .run_op(|| self.consensus.scan(key, from, limit), Self::on_err)
2995            .await;
2996        if let Ok(dataz) = res.as_ref() {
2997            let bytes: usize = dataz.iter().map(|x| x.data.len()).sum();
2998            self.metrics
2999                .consensus
3000                .scan
3001                .bytes
3002                .inc_by(u64::cast_from(bytes));
3003        }
3004        res
3005    }
3006
3007    #[instrument(name = "consensus::truncate", fields(shard=key))]
3008    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
3009        let metrics = &self.metrics.consensus;
3010        let deleted = metrics
3011            .truncate
3012            .run_op(|| self.consensus.truncate(key, seqno), Self::on_err)
3013            .await?;
3014        if let Some(deleted) = deleted {
3015            metrics.truncated_count.inc_by(u64::cast_from(deleted));
3016        }
3017        Ok(deleted)
3018    }
3019}
3020
3021/// A standard set of metrics for an async task. Call [TaskMetrics::instrument_task] to instrument
3022/// a future and report its metrics for this task type.
3023#[derive(Debug, Clone)]
3024pub struct TaskMetrics {
3025    f64_gauges: Vec<(Gauge, fn(&tokio_metrics::TaskMetrics) -> f64)>,
3026    u64_gauges: Vec<(
3027        GenericGauge<AtomicU64>,
3028        fn(&tokio_metrics::TaskMetrics) -> u64,
3029    )>,
3030    monitor: TaskMonitor,
3031}
3032
3033impl TaskMetrics {
3034    pub fn new(name: &str) -> Self {
3035        let monitor = TaskMonitor::new();
3036        Self {
3037            f64_gauges: vec![
3038                (
3039                    Gauge::make_collector(metric!(
3040                        name: "mz_persist_task_total_idle_duration",
3041                        help: "Seconds of time spent idling, ie. waiting for a task to be woken up.",
3042                        const_labels: {"name" => name}
3043                    )),
3044                    |m| m.total_idle_duration.as_secs_f64(),
3045                ),
3046                (
3047                    Gauge::make_collector(metric!(
3048                        name: "mz_persist_task_total_scheduled_duration",
3049                        help: "Seconds of time spent scheduled, ie. ready to poll but not yet polled.",
3050                        const_labels: {"name" => name}
3051                    )),
3052                    |m| m.total_scheduled_duration.as_secs_f64(),
3053                ),
3054            ],
3055            u64_gauges: vec![
3056                (
3057                    MakeCollector::make_collector(metric!(
3058                        name: "mz_persist_task_total_scheduled_count",
3059                        help: "The total number of task schedules. Useful for computing the average scheduled time.",
3060                        const_labels: {"name" => name}
3061                    )),
3062                    |m| m.total_scheduled_count,
3063                ),
3064                (
3065                    MakeCollector::make_collector(metric!(
3066                        name: "mz_persist_task_total_idled_count",
3067                        help: "The total number of task idles. Useful for computing the average idle time.",
3068                        const_labels: {"name" => name}
3069                    ,
3070                    )),
3071                    |m| m.total_idled_count,
3072                ),
3073            ],
3074            monitor,
3075        }
3076    }
3077
3078    /// Instrument the provided future. The expectation is that the result will be executed
3079    /// as a task. (See [TaskMonitor::instrument] for more context.)
3080    pub fn instrument_task<F>(&self, task: F) -> tokio_metrics::Instrumented<F> {
3081        TaskMonitor::instrument(&self.monitor, task)
3082    }
3083}
3084
3085impl Collector for TaskMetrics {
3086    fn desc(&self) -> Vec<&Desc> {
3087        let mut descs = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
3088        for (g, _) in &self.f64_gauges {
3089            descs.extend(g.desc());
3090        }
3091        for (g, _) in &self.u64_gauges {
3092            descs.extend(g.desc());
3093        }
3094        descs
3095    }
3096
3097    fn collect(&self) -> Vec<MetricFamily> {
3098        let mut families = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
3099        let metrics = self.monitor.cumulative();
3100        for (g, metrics_fn) in &self.f64_gauges {
3101            g.set(metrics_fn(&metrics));
3102            families.extend(g.collect());
3103        }
3104        for (g, metrics_fn) in &self.u64_gauges {
3105            g.set(metrics_fn(&metrics));
3106            families.extend(g.collect());
3107        }
3108        families
3109    }
3110}
3111
3112#[derive(Debug)]
3113pub struct TasksMetrics {
3114    pub heartbeat_read: TaskMetrics,
3115}
3116
3117impl TasksMetrics {
3118    fn new(registry: &MetricsRegistry) -> Self {
3119        let heartbeat_read = TaskMetrics::new("heartbeat_read");
3120        registry.register_collector(heartbeat_read.clone());
3121        TasksMetrics { heartbeat_read }
3122    }
3123}
3124
3125#[derive(Debug)]
3126pub struct SchemaMetrics {
3127    pub(crate) cache_fetch_state_count: IntCounter,
3128    pub(crate) cache_schema: SchemaCacheMetrics,
3129    pub(crate) cache_migration: SchemaCacheMetrics,
3130    pub(crate) migration_count_same: IntCounter,
3131    pub(crate) migration_count_codec: IntCounter,
3132    pub(crate) migration_count_either: IntCounter,
3133    pub(crate) migration_len_legacy_codec: IntCounter,
3134    pub(crate) migration_len_either_codec: IntCounter,
3135    pub(crate) migration_len_either_arrow: IntCounter,
3136    pub(crate) migration_new_count: IntCounter,
3137    pub(crate) migration_new_seconds: Counter,
3138    pub(crate) migration_migrate_seconds: Counter,
3139}
3140
3141impl SchemaMetrics {
3142    fn new(registry: &MetricsRegistry) -> Self {
3143        let cached: IntCounterVec = registry.register(metric!(
3144            name: "mz_persist_schema_cache_cached_count",
3145            help: "count of schema cache entries served from cache",
3146            var_labels: ["op"],
3147        ));
3148        let computed: IntCounterVec = registry.register(metric!(
3149            name: "mz_persist_schema_cache_computed_count",
3150            help: "count of schema cache entries computed",
3151            var_labels: ["op"],
3152        ));
3153        let unavailable: IntCounterVec = registry.register(metric!(
3154            name: "mz_persist_schema_cache_unavailable_count",
3155            help: "count of schema cache entries unavailable at current state",
3156            var_labels: ["op"],
3157        ));
3158        let added: IntCounterVec = registry.register(metric!(
3159            name: "mz_persist_schema_cache_added_count",
3160            help: "count of schema cache entries added",
3161            var_labels: ["op"],
3162        ));
3163        let dropped: IntCounterVec = registry.register(metric!(
3164            name: "mz_persist_schema_cache_dropped_count",
3165            help: "count of schema cache entries dropped",
3166            var_labels: ["op"],
3167        ));
3168        let cache = |name| SchemaCacheMetrics {
3169            cached_count: cached.with_label_values(&[name]),
3170            computed_count: computed.with_label_values(&[name]),
3171            unavailable_count: unavailable.with_label_values(&[name]),
3172            added_count: added.with_label_values(&[name]),
3173            dropped_count: dropped.with_label_values(&[name]),
3174        };
3175        let migration_count: IntCounterVec = registry.register(metric!(
3176            name: "mz_persist_schema_migration_count",
3177            help: "count of fetch part migrations",
3178            var_labels: ["op"],
3179        ));
3180        let migration_len: IntCounterVec = registry.register(metric!(
3181            name: "mz_persist_schema_migration_len",
3182            help: "count of migrated update records",
3183            var_labels: ["op"],
3184        ));
3185        SchemaMetrics {
3186            cache_fetch_state_count: registry.register(metric!(
3187                name: "mz_persist_schema_cache_fetch_state_count",
3188                help: "count of state fetches by the schema cache",
3189            )),
3190            cache_schema: cache("schema"),
3191            cache_migration: cache("migration"),
3192            migration_count_same: migration_count.with_label_values(&["same"]),
3193            migration_count_codec: migration_count.with_label_values(&["codec"]),
3194            migration_count_either: migration_count.with_label_values(&["either"]),
3195            migration_len_legacy_codec: migration_len.with_label_values(&["legacy_codec"]),
3196            migration_len_either_codec: migration_len.with_label_values(&["either_codec"]),
3197            migration_len_either_arrow: migration_len.with_label_values(&["either_arrow"]),
3198            migration_new_count: registry.register(metric!(
3199                name: "mz_persist_schema_migration_new_count",
3200                help: "count of migrations constructed",
3201            )),
3202            migration_new_seconds: registry.register(metric!(
3203                name: "mz_persist_schema_migration_new_seconds",
3204                help: "seconds spent constructing migration logic",
3205            )),
3206            migration_migrate_seconds: registry.register(metric!(
3207                name: "mz_persist_schema_migration_migrate_seconds",
3208                help: "seconds spent applying migration logic",
3209            )),
3210        }
3211    }
3212}
3213
3214#[derive(Debug, Clone)]
3215pub struct SchemaCacheMetrics {
3216    pub(crate) cached_count: IntCounter,
3217    pub(crate) computed_count: IntCounter,
3218    pub(crate) unavailable_count: IntCounter,
3219    pub(crate) added_count: IntCounter,
3220    pub(crate) dropped_count: IntCounter,
3221}
3222
3223#[derive(Debug)]
3224pub struct InlineMetrics {
3225    pub(crate) part_commit_count: IntCounter,
3226    pub(crate) part_commit_bytes: IntCounter,
3227    pub(crate) backpressure: BatchWriteMetrics,
3228}
3229
3230impl InlineMetrics {
3231    fn new(registry: &MetricsRegistry) -> Self {
3232        InlineMetrics {
3233            part_commit_count: registry.register(metric!(
3234                name: "mz_persist_inline_part_commit_count",
3235                help: "count of inline parts committed to state",
3236            )),
3237            part_commit_bytes: registry.register(metric!(
3238                name: "mz_persist_inline_part_commit_bytes",
3239                help: "total size of of inline parts committed to state",
3240            )),
3241            backpressure: BatchWriteMetrics::new(registry, "inline_backpressure"),
3242        }
3243    }
3244}
3245
3246fn blob_key_shard_id(key: &str) -> Option<String> {
3247    let (shard_id, _) = BlobKey::parse_ids(key).ok()?;
3248    Some(shard_id.to_string())
3249}
3250
3251/// Encode a frontier into an i64 acceptable for use in metrics.
3252pub fn encode_ts_metric<T: Codec64>(ts: &Antichain<T>) -> i64 {
3253    // We have two problems in mapping a persist frontier into a metric.
3254    // First is that we only have a `T: Timestamp+Codec64`. Second, is
3255    // mapping an antichain to a single counter value. We solve both by
3256    // taking advantage of the fact that in practice, timestamps in mz are
3257    // currently always a u64 (and if we switch them, it will be to an i64).
3258    // This means that for all values that mz would actually produce,
3259    // interpreting the encoded bytes as a little-endian i64 will work.
3260    // Both of them impl PartialOrder, so in practice, there will always be
3261    // zero or one elements in the antichain.
3262    match ts.elements().first() {
3263        Some(ts) => i64::from_le_bytes(Codec64::encode(ts)),
3264        None => i64::MAX,
3265    }
3266}