Skip to main content

mz_persist_client/internal/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Prometheus monitoring metrics.
11
12use async_stream::stream;
13use mz_persist_types::stats::PartStatsMetrics;
14use std::collections::BTreeMap;
15use std::sync::{Arc, Mutex, Weak};
16use std::time::{Duration, Instant};
17use tokio::sync::{OnceCell, OwnedSemaphorePermit, Semaphore};
18
19use async_trait::async_trait;
20use bytes::Bytes;
21use futures_util::StreamExt;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::{CastFrom, CastLossy};
24use mz_ore::instrument;
25use mz_ore::metric;
26use mz_ore::metrics::{
27    ComputedGauge, ComputedIntGauge, ComputedUIntGauge, Counter, DeleteOnDropCounter,
28    DeleteOnDropGauge, IntCounter, MakeCollector, MetricVecExt, MetricsRegistry, UIntGauge,
29    UIntGaugeVec, raw,
30};
31use mz_ore::stats::histogram_seconds_buckets;
32use mz_persist::location::{
33    Blob, BlobMetadata, CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData,
34};
35use mz_persist::metrics::{ColumnarMetrics, S3BlobMetrics};
36use mz_persist::retry::RetryStream;
37use mz_persist_types::Codec64;
38use mz_postgres_client::metrics::PostgresClientMetrics;
39use prometheus::core::{AtomicI64, AtomicU64, Collector, Desc, GenericGauge};
40use prometheus::proto::MetricFamily;
41use prometheus::{CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounterVec};
42use timely::progress::Antichain;
43use tokio_metrics::TaskMonitor;
44use tracing::{Instrument, debug, info, info_span};
45
46use crate::fetch::{FETCH_SEMAPHORE_COST_ADJUSTMENT, FETCH_SEMAPHORE_PERMIT_ADJUSTMENT};
47use crate::internal::paths::BlobKey;
48use crate::{PersistConfig, ShardId};
49
50/// Prometheus monitoring metrics.
51///
52/// Intentionally not Clone because we expect this to be passed around in an
53/// Arc.
54pub struct Metrics {
55    _vecs: MetricsVecs,
56    _uptime: ComputedGauge,
57
58    /// Metrics for [Blob] usage.
59    pub blob: BlobMetrics,
60    /// Metrics for [Consensus] usage.
61    pub consensus: ConsensusMetrics,
62    /// Metrics of command evaluation.
63    pub cmds: CmdsMetrics,
64    /// Metrics for each retry loop.
65    pub retries: RetriesMetrics,
66    /// Metrics for batches written directly on behalf of a user (BatchBuilder
67    /// or one of the sugar methods that use it).
68    pub user: BatchWriteMetrics,
69    /// Metrics for reading batch parts
70    pub read: BatchPartReadMetrics,
71    /// Metrics for compaction.
72    pub compaction: CompactionMetrics,
73    /// Metrics for garbage collection.
74    pub gc: GcMetrics,
75    /// Metrics for leasing and automatic lease expiry.
76    pub lease: LeaseMetrics,
77    /// Metrics for various encodings and decodings.
78    pub codecs: CodecsMetrics,
79    /// Metrics for (incremental) state updates and fetches.
80    pub state: StateMetrics,
81    /// Metrics for various per-shard measurements.
82    pub shards: ShardsMetrics,
83    /// Metrics for auditing persist usage
84    pub audit: UsageAuditMetrics,
85    /// Metrics for locking.
86    pub locks: LocksMetrics,
87    /// Metrics for StateWatch.
88    pub watch: WatchMetrics,
89    /// Metrics for PubSub client.
90    pub pubsub_client: PubSubClientMetrics,
91    /// Metrics for mfp/filter pushdown.
92    pub pushdown: PushdownMetrics,
93    /// Metrics for consolidation.
94    pub consolidation: ConsolidationMetrics,
95    /// Metrics for blob caching.
96    pub blob_cache_mem: BlobMemCache,
97    /// Metrics for tokio tasks.
98    pub tasks: TasksMetrics,
99    /// Metrics for columnar data encoding and decoding.
100    pub columnar: ColumnarMetrics,
101    /// Metrics for schemas and the schema registry.
102    pub schema: SchemaMetrics,
103    /// Metrics for inline writes.
104    pub inline: InlineMetrics,
105    /// Semaphore to limit memory/disk use by fetches.
106    pub(crate) semaphore: SemaphoreMetrics,
107
108    /// Metrics for the persist sink.
109    pub sink: SinkMetrics,
110
111    /// Metrics for S3-backed blob implementation
112    pub s3_blob: S3BlobMetrics,
113    /// Metrics for Postgres-backed consensus implementation
114    pub postgres_consensus: PostgresClientMetrics,
115
116    #[allow(dead_code)]
117    pub(crate) registry: MetricsRegistry,
118}
119
120impl std::fmt::Debug for Metrics {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        f.debug_struct("Metrics").finish_non_exhaustive()
123    }
124}
125
126impl Metrics {
127    /// Returns a new [Metrics] instance connected to the given registry.
128    pub fn new(cfg: &PersistConfig, registry: &MetricsRegistry) -> Self {
129        let vecs = MetricsVecs::new(registry);
130        let start = Instant::now();
131        let uptime = registry.register_computed_gauge(
132            metric!(
133                name: "mz_persist_metadata_seconds",
134                help: "server uptime, labels are build metadata",
135                const_labels: {
136                    "version" => cfg.build_version,
137                    "build_type" => if cfg!(release) { "release" } else { "debug" }
138                },
139            ),
140            move || start.elapsed().as_secs_f64(),
141        );
142        let s3_blob = S3BlobMetrics::new(registry);
143        let columnar = ColumnarMetrics::new(registry);
144        Metrics {
145            blob: vecs.blob_metrics(),
146            consensus: vecs.consensus_metrics(),
147            cmds: vecs.cmds_metrics(registry),
148            retries: vecs.retries_metrics(),
149            codecs: vecs.codecs_metrics(),
150            user: BatchWriteMetrics::new(registry, "user"),
151            read: vecs.batch_part_read_metrics(),
152            compaction: CompactionMetrics::new(registry),
153            gc: GcMetrics::new(registry),
154            lease: LeaseMetrics::new(registry),
155            state: StateMetrics::new(registry),
156            shards: ShardsMetrics::new(registry),
157            audit: UsageAuditMetrics::new(registry),
158            locks: vecs.locks_metrics(),
159            watch: WatchMetrics::new(registry),
160            pubsub_client: PubSubClientMetrics::new(registry),
161            pushdown: PushdownMetrics::new(registry),
162            consolidation: ConsolidationMetrics::new(registry),
163            blob_cache_mem: BlobMemCache::new(registry),
164            tasks: TasksMetrics::new(registry),
165            columnar,
166            schema: SchemaMetrics::new(registry),
167            inline: InlineMetrics::new(registry),
168            semaphore: SemaphoreMetrics::new(cfg.clone(), registry.clone()),
169            sink: SinkMetrics::new(registry),
170            s3_blob,
171            postgres_consensus: PostgresClientMetrics::new(registry, "mz_persist"),
172            _vecs: vecs,
173            _uptime: uptime,
174            registry: registry.clone(),
175        }
176    }
177
178    /// Returns the current lifetime write amplification reflected in these
179    /// metrics.
180    ///
181    /// Only exposed for tests, persistcli, and benchmarks.
182    pub fn write_amplification(&self) -> f64 {
183        // This intentionally uses "bytes" for total and "goodbytes" for user so
184        // that the overhead of our blob format is included.
185        let total_written = self.blob.set.bytes.get();
186        let user_written = self.user.goodbytes.get();
187        #[allow(clippy::as_conversions)]
188        {
189            total_written as f64 / user_written as f64
190        }
191    }
192}
193
194#[derive(Debug)]
195struct MetricsVecs {
196    cmd_started: IntCounterVec,
197    cmd_cas_mismatch: IntCounterVec,
198    cmd_succeeded: IntCounterVec,
199    cmd_failed: IntCounterVec,
200    cmd_seconds: CounterVec,
201
202    external_op_started: IntCounterVec,
203    external_op_succeeded: IntCounterVec,
204    external_op_failed: IntCounterVec,
205    external_op_bytes: IntCounterVec,
206    external_op_seconds: CounterVec,
207    external_consensus_truncated_count: IntCounter,
208    external_blob_delete_noop_count: IntCounter,
209    external_blob_sizes: Histogram,
210    external_rtt_latency: GaugeVec,
211    external_op_latency: HistogramVec,
212
213    retry_started: IntCounterVec,
214    retry_finished: IntCounterVec,
215    retry_retries: IntCounterVec,
216    retry_sleep_seconds: CounterVec,
217
218    encode_count: IntCounterVec,
219    encode_seconds: CounterVec,
220    decode_count: IntCounterVec,
221    decode_seconds: CounterVec,
222
223    read_part_bytes: IntCounterVec,
224    read_part_goodbytes: IntCounterVec,
225    read_part_count: IntCounterVec,
226    read_part_seconds: CounterVec,
227    read_ts_rewrite: IntCounterVec,
228
229    lock_acquire_count: IntCounterVec,
230    lock_blocking_acquire_count: IntCounterVec,
231    lock_blocking_seconds: CounterVec,
232
233    /// A minimal set of metrics imported into honeycomb for alerting.
234    alerts_metrics: Arc<AlertsMetrics>,
235}
236
237impl MetricsVecs {
238    fn new(registry: &MetricsRegistry) -> Self {
239        MetricsVecs {
240            cmd_started: registry.register(metric!(
241                name: "mz_persist_cmd_started_count",
242                help: "count of commands started",
243                var_labels: ["cmd"],
244            )),
245            cmd_cas_mismatch: registry.register(metric!(
246                name: "mz_persist_cmd_cas_mismatch_count",
247                help: "count of command retries from CaS mismatch",
248                var_labels: ["cmd"],
249            )),
250            cmd_succeeded: registry.register(metric!(
251                name: "mz_persist_cmd_succeeded_count",
252                help: "count of commands succeeded",
253                var_labels: ["cmd"],
254            )),
255            cmd_failed: registry.register(metric!(
256                name: "mz_persist_cmd_failed_count",
257                help: "count of commands failed",
258                var_labels: ["cmd"],
259            )),
260            cmd_seconds: registry.register(metric!(
261                name: "mz_persist_cmd_seconds",
262                help: "time spent applying commands",
263                var_labels: ["cmd"],
264            )),
265
266            external_op_started: registry.register(metric!(
267                name: "mz_persist_external_started_count",
268                help: "count of external service calls started",
269                var_labels: ["op"],
270            )),
271            external_op_succeeded: registry.register(metric!(
272                name: "mz_persist_external_succeeded_count",
273                help: "count of external service calls succeeded",
274                var_labels: ["op"],
275            )),
276            external_op_failed: registry.register(metric!(
277                name: "mz_persist_external_failed_count",
278                help: "count of external service calls failed",
279                var_labels: ["op"],
280            )),
281            external_op_bytes: registry.register(metric!(
282                name: "mz_persist_external_bytes_count",
283                help: "total size represented by external service calls",
284                var_labels: ["op"],
285            )),
286            external_op_seconds: registry.register(metric!(
287                name: "mz_persist_external_seconds",
288                help: "time spent in external service calls",
289                var_labels: ["op"],
290            )),
291            external_consensus_truncated_count: registry.register(metric!(
292                name: "mz_persist_external_consensus_truncated_count",
293                help: "count of versions deleted by consensus truncate calls",
294            )),
295            external_blob_delete_noop_count: registry.register(metric!(
296                name: "mz_persist_external_blob_delete_noop_count",
297                help: "count of blob delete calls that deleted a non-existent key",
298            )),
299            external_blob_sizes: registry.register(metric!(
300                name: "mz_persist_external_blob_sizes",
301                help: "histogram of blob sizes at put time",
302                buckets: mz_ore::stats::HISTOGRAM_BYTE_BUCKETS.to_vec(),
303            )),
304            external_rtt_latency: registry.register(metric!(
305                name: "mz_persist_external_rtt_latency",
306                help: "roundtrip-time to external service as seen by this process",
307                var_labels: ["external"],
308            )),
309            external_op_latency: registry.register(metric!(
310                name: "mz_persist_external_op_latency",
311                help: "rountrip latency observed by individual performance-critical operations",
312                var_labels: ["op"],
313                // NB: If we end up overrunning metrics quotas, we could plausibly cut this
314                // down by switching to a factor of 4 between buckets (vs. the standard 2).
315                buckets: histogram_seconds_buckets(0.000_500, 32.0),
316            )),
317
318            retry_started: registry.register(metric!(
319                name: "mz_persist_retry_started_count",
320                help: "count of retry loops started",
321                var_labels: ["op"],
322            )),
323            retry_finished: registry.register(metric!(
324                name: "mz_persist_retry_finished_count",
325                help: "count of retry loops finished",
326                var_labels: ["op"],
327            )),
328            retry_retries: registry.register(metric!(
329                name: "mz_persist_retry_retries_count",
330                help: "count of total attempts by retry loops",
331                var_labels: ["op"],
332            )),
333            retry_sleep_seconds: registry.register(metric!(
334                name: "mz_persist_retry_sleep_seconds",
335                help: "time spent in retry loop backoff",
336                var_labels: ["op"],
337            )),
338
339            encode_count: registry.register(metric!(
340                name: "mz_persist_encode_count",
341                help: "count of op encodes",
342                var_labels: ["op"],
343            )),
344            encode_seconds: registry.register(metric!(
345                name: "mz_persist_encode_seconds",
346                help: "time spent in op encodes",
347                var_labels: ["op"],
348            )),
349            decode_count: registry.register(metric!(
350                name: "mz_persist_decode_count",
351                help: "count of op decodes",
352                var_labels: ["op"],
353            )),
354            decode_seconds: registry.register(metric!(
355                name: "mz_persist_decode_seconds",
356                help: "time spent in op decodes",
357                var_labels: ["op"],
358            )),
359
360            read_part_bytes: registry.register(metric!(
361                name: "mz_persist_read_batch_part_bytes",
362                help: "total encoded size of batch parts read",
363                var_labels: ["op"],
364            )),
365            read_part_goodbytes: registry.register(metric!(
366                name: "mz_persist_read_batch_part_goodbytes",
367                help: "total logical size of batch parts read",
368                var_labels: ["op"],
369            )),
370            read_part_count: registry.register(metric!(
371                name: "mz_persist_read_batch_part_count",
372                help: "count of batch parts read",
373                var_labels: ["op"],
374            )),
375            read_part_seconds: registry.register(metric!(
376                name: "mz_persist_read_batch_part_seconds",
377                help: "time spent reading batch parts",
378                var_labels: ["op"],
379            )),
380            read_ts_rewrite: registry.register(metric!(
381                name: "mz_persist_read_ts_rewite",
382                help: "count of updates read with rewritten ts",
383                var_labels: ["op"],
384            )),
385
386            lock_acquire_count: registry.register(metric!(
387                name: "mz_persist_lock_acquire_count",
388                help: "count of locks acquired",
389                var_labels: ["op"],
390            )),
391            lock_blocking_acquire_count: registry.register(metric!(
392                name: "mz_persist_lock_blocking_acquire_count",
393                help: "count of locks acquired that required blocking",
394                var_labels: ["op"],
395            )),
396            lock_blocking_seconds: registry.register(metric!(
397                name: "mz_persist_lock_blocking_seconds",
398                help: "time spent blocked for a lock",
399                var_labels: ["op"],
400            )),
401
402            alerts_metrics: Arc::new(AlertsMetrics::new(registry)),
403        }
404    }
405
406    fn cmds_metrics(&self, registry: &MetricsRegistry) -> CmdsMetrics {
407        CmdsMetrics {
408            init_state: self.cmd_metrics("init_state"),
409            add_rollup: self.cmd_metrics("add_rollup"),
410            remove_rollups: self.cmd_metrics("remove_rollups"),
411            register: self.cmd_metrics("register"),
412            compare_and_append: self.cmd_metrics("compare_and_append"),
413            compare_and_append_noop:             registry.register(metric!(
414                name: "mz_persist_cmd_compare_and_append_noop",
415                help: "count of compare_and_append retries that were discoverd to have already committed",
416            )),
417            compare_and_downgrade_since: self.cmd_metrics("compare_and_downgrade_since"),
418            downgrade_since: self.cmd_metrics("downgrade_since"),
419            expire_reader: self.cmd_metrics("expire_reader"),
420            expire_writer: self.cmd_metrics("expire_writer"),
421            merge_res: self.cmd_metrics("merge_res"),
422            become_tombstone: self.cmd_metrics("become_tombstone"),
423            compare_and_evolve_schema: self.cmd_metrics("compare_and_evolve_schema"),
424            spine_exert: self.cmd_metrics("spine_exert"),
425            fetch_upper_count: registry.register(metric!(
426                name: "mz_persist_cmd_fetch_upper_count",
427                help: "count of fetch_upper calls",
428            ))
429        }
430    }
431
432    fn cmd_metrics(&self, cmd: &str) -> CmdMetrics {
433        CmdMetrics {
434            name: cmd.to_owned(),
435            started: self.cmd_started.with_label_values(&[cmd]),
436            succeeded: self.cmd_succeeded.with_label_values(&[cmd]),
437            cas_mismatch: self.cmd_cas_mismatch.with_label_values(&[cmd]),
438            failed: self.cmd_failed.with_label_values(&[cmd]),
439            seconds: self.cmd_seconds.with_label_values(&[cmd]),
440        }
441    }
442
443    fn retries_metrics(&self) -> RetriesMetrics {
444        RetriesMetrics {
445            determinate: RetryDeterminate {
446                apply_unbatched_cmd_cas: self.retry_metrics("apply_unbatched_cmd::cas"),
447            },
448            external: RetryExternal {
449                batch_delete: Arc::new(self.retry_metrics("batch::delete")),
450                batch_set: self.retry_metrics("batch::set"),
451                blob_open: self.retry_metrics("blob::open"),
452                compaction_noop_delete: Arc::new(self.retry_metrics("compaction_noop::delete")),
453                consensus_open: self.retry_metrics("consensus::open"),
454                fetch_batch_get: self.retry_metrics("fetch_batch::get"),
455                fetch_state_scan: self.retry_metrics("fetch_state::scan"),
456                gc_truncate: self.retry_metrics("gc::truncate"),
457                maybe_init_cas: self.retry_metrics("maybe_init::cas"),
458                rollup_delete: self.retry_metrics("rollup::delete"),
459                rollup_get: self.retry_metrics("rollup::get"),
460                rollup_set: self.retry_metrics("rollup::set"),
461                hollow_run_get: self.retry_metrics("hollow_run::get"),
462                hollow_run_set: self.retry_metrics("hollow_run::set"),
463                storage_usage_shard_size: self.retry_metrics("storage_usage::shard_size"),
464            },
465            compare_and_append_idempotent: self.retry_metrics("compare_and_append_idempotent"),
466            fetch_latest_state: self.retry_metrics("fetch_latest_state"),
467            fetch_live_states: self.retry_metrics("fetch_live_states"),
468            idempotent_cmd: self.retry_metrics("idempotent_cmd"),
469            next_listen_batch: self.retry_metrics("next_listen_batch"),
470            snapshot: self.retry_metrics("snapshot"),
471        }
472    }
473
474    fn retry_metrics(&self, name: &str) -> RetryMetrics {
475        RetryMetrics {
476            name: name.to_owned(),
477            started: self.retry_started.with_label_values(&[name]),
478            finished: self.retry_finished.with_label_values(&[name]),
479            retries: self.retry_retries.with_label_values(&[name]),
480            sleep_seconds: self.retry_sleep_seconds.with_label_values(&[name]),
481        }
482    }
483
484    fn codecs_metrics(&self) -> CodecsMetrics {
485        CodecsMetrics {
486            state: self.codec_metrics("state"),
487            state_diff: self.codec_metrics("state_diff"),
488            batch: self.codec_metrics("batch"),
489            key: self.codec_metrics("key"),
490            val: self.codec_metrics("val"),
491        }
492    }
493
494    fn codec_metrics(&self, op: &str) -> CodecMetrics {
495        CodecMetrics {
496            encode_count: self.encode_count.with_label_values(&[op]),
497            encode_seconds: self.encode_seconds.with_label_values(&[op]),
498            decode_count: self.decode_count.with_label_values(&[op]),
499            decode_seconds: self.decode_seconds.with_label_values(&[op]),
500        }
501    }
502
503    fn blob_metrics(&self) -> BlobMetrics {
504        BlobMetrics {
505            set: self.external_op_metrics("blob_set", true),
506            get: self.external_op_metrics("blob_get", true),
507            list_keys: self.external_op_metrics("blob_list_keys", false),
508            delete: self.external_op_metrics("blob_delete", false),
509            restore: self.external_op_metrics("restore", false),
510            delete_noop: self.external_blob_delete_noop_count.clone(),
511            blob_sizes: self.external_blob_sizes.clone(),
512            rtt_latency: self.external_rtt_latency.with_label_values(&["blob"]),
513        }
514    }
515
516    fn consensus_metrics(&self) -> ConsensusMetrics {
517        ConsensusMetrics {
518            list_keys: self.external_op_metrics("consensus_list_keys", false),
519            head: self.external_op_metrics("consensus_head", false),
520            compare_and_set: self.external_op_metrics("consensus_cas", true),
521            scan: self.external_op_metrics("consensus_scan", false),
522            truncate: self.external_op_metrics("consensus_truncate", false),
523            truncated_count: self.external_consensus_truncated_count.clone(),
524            rtt_latency: self.external_rtt_latency.with_label_values(&["consensus"]),
525        }
526    }
527
528    fn external_op_metrics(&self, op: &str, latency_histogram: bool) -> ExternalOpMetrics {
529        ExternalOpMetrics {
530            started: self.external_op_started.with_label_values(&[op]),
531            succeeded: self.external_op_succeeded.with_label_values(&[op]),
532            failed: self.external_op_failed.with_label_values(&[op]),
533            bytes: self.external_op_bytes.with_label_values(&[op]),
534            seconds: self.external_op_seconds.with_label_values(&[op]),
535            seconds_histogram: if latency_histogram {
536                Some(self.external_op_latency.with_label_values(&[op]))
537            } else {
538                None
539            },
540            alerts_metrics: Arc::clone(&self.alerts_metrics),
541        }
542    }
543
544    fn batch_part_read_metrics(&self) -> BatchPartReadMetrics {
545        BatchPartReadMetrics {
546            listen: self.read_metrics("listen"),
547            snapshot: self.read_metrics("snapshot"),
548            batch_fetcher: self.read_metrics("batch_fetcher"),
549            compaction: self.read_metrics("compaction"),
550            unindexed: self.read_metrics("unindexed"),
551        }
552    }
553
554    fn read_metrics(&self, op: &str) -> ReadMetrics {
555        ReadMetrics {
556            part_bytes: self.read_part_bytes.with_label_values(&[op]),
557            part_goodbytes: self.read_part_goodbytes.with_label_values(&[op]),
558            part_count: self.read_part_count.with_label_values(&[op]),
559            seconds: self.read_part_seconds.with_label_values(&[op]),
560            ts_rewrite: self.read_ts_rewrite.with_label_values(&[op]),
561        }
562    }
563
564    fn locks_metrics(&self) -> LocksMetrics {
565        LocksMetrics {
566            applier_read_cacheable: self.lock_metrics("applier_read_cacheable"),
567            applier_read_noncacheable: self.lock_metrics("applier_read_noncacheable"),
568            applier_write: self.lock_metrics("applier_write"),
569            watch: self.lock_metrics("watch"),
570        }
571    }
572
573    fn lock_metrics(&self, op: &str) -> LockMetrics {
574        LockMetrics {
575            acquire_count: self.lock_acquire_count.with_label_values(&[op]),
576            blocking_acquire_count: self.lock_blocking_acquire_count.with_label_values(&[op]),
577            blocking_seconds: self.lock_blocking_seconds.with_label_values(&[op]),
578        }
579    }
580}
581
582#[derive(Debug)]
583pub struct CmdMetrics {
584    pub(crate) name: String,
585    pub(crate) started: IntCounter,
586    pub(crate) cas_mismatch: IntCounter,
587    pub(crate) succeeded: IntCounter,
588    pub(crate) failed: IntCounter,
589    pub(crate) seconds: Counter,
590}
591
592impl CmdMetrics {
593    pub async fn run_cmd<R, E, F, CmdFn>(
594        &self,
595        shard_metrics: &ShardMetrics,
596        cmd_fn: CmdFn,
597    ) -> Result<R, E>
598    where
599        F: std::future::Future<Output = Result<R, E>>,
600        CmdFn: FnOnce() -> F,
601    {
602        self.started.inc();
603        let start = Instant::now();
604        let res = cmd_fn().await;
605        self.seconds.inc_by(start.elapsed().as_secs_f64());
606        match res.as_ref() {
607            Ok(_) => {
608                self.succeeded.inc();
609                shard_metrics.cmd_succeeded.inc();
610            }
611            Err(_) => self.failed.inc(),
612        };
613        res
614    }
615}
616
617#[derive(Debug)]
618pub struct CmdsMetrics {
619    pub(crate) init_state: CmdMetrics,
620    pub(crate) add_rollup: CmdMetrics,
621    pub(crate) remove_rollups: CmdMetrics,
622    pub(crate) register: CmdMetrics,
623    pub(crate) compare_and_append: CmdMetrics,
624    pub(crate) compare_and_append_noop: IntCounter,
625    pub(crate) compare_and_downgrade_since: CmdMetrics,
626    pub(crate) downgrade_since: CmdMetrics,
627    pub(crate) expire_reader: CmdMetrics,
628    pub(crate) expire_writer: CmdMetrics,
629    pub(crate) merge_res: CmdMetrics,
630    pub(crate) become_tombstone: CmdMetrics,
631    pub(crate) compare_and_evolve_schema: CmdMetrics,
632    pub(crate) spine_exert: CmdMetrics,
633    pub(crate) fetch_upper_count: IntCounter,
634}
635
636#[derive(Debug)]
637pub struct RetryMetrics {
638    pub(crate) name: String,
639    pub(crate) started: IntCounter,
640    pub(crate) finished: IntCounter,
641    pub(crate) retries: IntCounter,
642    pub(crate) sleep_seconds: Counter,
643}
644
645impl RetryMetrics {
646    pub(crate) fn stream(&self, retry: RetryStream) -> MetricsRetryStream {
647        MetricsRetryStream::new(retry, self)
648    }
649}
650
651#[derive(Debug)]
652pub struct RetryDeterminate {
653    pub(crate) apply_unbatched_cmd_cas: RetryMetrics,
654}
655
656#[derive(Debug)]
657pub struct RetryExternal {
658    pub(crate) batch_delete: Arc<RetryMetrics>,
659    pub(crate) batch_set: RetryMetrics,
660    pub(crate) blob_open: RetryMetrics,
661    pub(crate) compaction_noop_delete: Arc<RetryMetrics>,
662    pub(crate) consensus_open: RetryMetrics,
663    pub(crate) fetch_batch_get: RetryMetrics,
664    pub(crate) fetch_state_scan: RetryMetrics,
665    pub(crate) gc_truncate: RetryMetrics,
666    pub(crate) maybe_init_cas: RetryMetrics,
667    pub(crate) rollup_delete: RetryMetrics,
668    pub(crate) rollup_get: RetryMetrics,
669    pub(crate) rollup_set: RetryMetrics,
670    pub(crate) hollow_run_get: RetryMetrics,
671    pub(crate) hollow_run_set: RetryMetrics,
672    pub(crate) storage_usage_shard_size: RetryMetrics,
673}
674
675#[derive(Debug)]
676pub struct RetriesMetrics {
677    pub(crate) determinate: RetryDeterminate,
678    pub(crate) external: RetryExternal,
679
680    pub(crate) compare_and_append_idempotent: RetryMetrics,
681    pub(crate) fetch_latest_state: RetryMetrics,
682    pub(crate) fetch_live_states: RetryMetrics,
683    pub(crate) idempotent_cmd: RetryMetrics,
684    pub(crate) next_listen_batch: RetryMetrics,
685    pub(crate) snapshot: RetryMetrics,
686}
687
688#[derive(Debug)]
689pub struct BatchPartReadMetrics {
690    pub(crate) listen: ReadMetrics,
691    pub(crate) snapshot: ReadMetrics,
692    pub(crate) batch_fetcher: ReadMetrics,
693    pub(crate) compaction: ReadMetrics,
694    pub(crate) unindexed: ReadMetrics,
695}
696
697#[derive(Debug, Clone)]
698pub struct ReadMetrics {
699    pub(crate) part_bytes: IntCounter,
700    pub(crate) part_goodbytes: IntCounter,
701    pub(crate) part_count: IntCounter,
702    pub(crate) seconds: Counter,
703    pub(crate) ts_rewrite: IntCounter,
704}
705
706// This one is Clone in contrast to the others because it has to get moved into
707// a task.
708#[derive(Debug, Clone)]
709pub struct BatchWriteMetrics {
710    pub(crate) bytes: IntCounter,
711    pub(crate) goodbytes: IntCounter,
712    pub(crate) seconds: Counter,
713    pub(crate) write_stalls: IntCounter,
714    pub(crate) key_lower_too_big: IntCounter,
715
716    pub(crate) unordered: IntCounter,
717    pub(crate) codec_order: IntCounter,
718    pub(crate) structured_order: IntCounter,
719    _order_counts: IntCounterVec,
720
721    pub(crate) step_stats: Counter,
722    pub(crate) step_part_writing: Counter,
723    pub(crate) step_inline: Counter,
724}
725
726impl BatchWriteMetrics {
727    fn new(registry: &MetricsRegistry, name: &str) -> Self {
728        let order_counts: IntCounterVec = registry.register(metric!(
729                name: format!("mz_persist_{}_write_batch_order", name),
730                help: "count of batches by the data ordering",
731                var_labels: ["order"],
732        ));
733        let unordered = order_counts.with_label_values(&["unordered"]);
734        let codec_order = order_counts.with_label_values(&["codec"]);
735        let structured_order = order_counts.with_label_values(&["structured"]);
736
737        BatchWriteMetrics {
738            bytes: registry.register(metric!(
739                name: format!("mz_persist_{}_bytes", name),
740                help: format!("total encoded size of {} batches written", name),
741            )),
742            goodbytes: registry.register(metric!(
743                name: format!("mz_persist_{}_goodbytes", name),
744                help: format!("total logical size of {} batches written", name),
745            )),
746            seconds: registry.register(metric!(
747                name: format!("mz_persist_{}_write_batch_part_seconds", name),
748                help: format!("time spent writing {} batches", name),
749            )),
750            write_stalls: registry.register(metric!(
751                name: format!("mz_persist_{}_write_stall_count", name),
752                help: format!(
753                    "count of {} writes stalling to await max outstanding reqs",
754                    name
755                ),
756            )),
757            key_lower_too_big: registry.register(metric!(
758                name: format!("mz_persist_{}_key_lower_too_big", name),
759                help: format!(
760                    "count of {} writes that were unable to write a key lower, because the size threshold was too low",
761                    name
762                ),
763            )),
764            unordered,
765            codec_order,
766            structured_order,
767            _order_counts: order_counts,
768            step_stats: registry.register(metric!(
769                name: format!("mz_persist_{}_step_stats", name),
770                help: format!("time spent computing {} update stats", name),
771            )),
772            step_part_writing: registry.register(metric!(
773                name: format!("mz_persist_{}_step_part_writing", name),
774                help: format!("blocking time spent writing parts for {} updates", name),
775            )),
776            step_inline: registry.register(metric!(
777                name: format!("mz_persist_{}_step_inline", name),
778                help: format!("time spent encoding {} inline batches", name)
779            )),
780        }
781    }
782}
783
784#[derive(Debug)]
785pub struct CompactionMetrics {
786    pub(crate) requested: IntCounter,
787    pub(crate) dropped: IntCounter,
788    pub(crate) disabled: IntCounter,
789    pub(crate) skipped: IntCounter,
790    pub(crate) started: IntCounter,
791    pub(crate) applied: IntCounter,
792    pub(crate) timed_out: IntCounter,
793    pub(crate) failed: IntCounter,
794    pub(crate) noop: IntCounter,
795    pub(crate) seconds: Counter,
796    pub(crate) concurrency_waits: IntCounter,
797    pub(crate) queued_seconds: Counter,
798    pub(crate) memory_violations: IntCounter,
799    pub(crate) runs_compacted: IntCounter,
800    pub(crate) chunks_compacted: IntCounter,
801    pub(crate) not_all_prefetched: IntCounter,
802    pub(crate) parts_prefetched: IntCounter,
803    pub(crate) parts_waited: IntCounter,
804    pub(crate) fast_path_eligible: IntCounter,
805    pub(crate) admin_count: IntCounter,
806
807    pub(crate) applied_exact_match: IntCounter,
808    pub(crate) applied_subset_match: IntCounter,
809    pub(crate) not_applied_too_many_updates: IntCounter,
810
811    pub(crate) batch: BatchWriteMetrics,
812    pub(crate) steps: CompactionStepTimings,
813    pub(crate) schema_selection: CompactionSchemaSelection,
814
815    pub(crate) _steps_vec: CounterVec,
816}
817
818impl CompactionMetrics {
819    fn new(registry: &MetricsRegistry) -> Self {
820        let step_timings: CounterVec = registry.register(metric!(
821                name: "mz_persist_compaction_step_seconds",
822                help: "time spent on individual steps of compaction",
823                var_labels: ["step"],
824        ));
825        let schema_selection: CounterVec = registry.register(metric!(
826            name: "mz_persist_compaction_schema_selection",
827            help: "count of compactions and how we did schema selection",
828            var_labels: ["selection"],
829        ));
830
831        CompactionMetrics {
832            requested: registry.register(metric!(
833                name: "mz_persist_compaction_requested",
834                help: "count of total compaction requests",
835            )),
836            dropped: registry.register(metric!(
837                name: "mz_persist_compaction_dropped",
838                help: "count of total compaction requests dropped due to a full queue",
839            )),
840            disabled: registry.register(metric!(
841                name: "mz_persist_compaction_disabled",
842                help: "count of total compaction requests dropped because compaction was disabled",
843            )),
844            skipped: registry.register(metric!(
845                name: "mz_persist_compaction_skipped",
846                help: "count of compactions skipped due to heuristics",
847            )),
848            started: registry.register(metric!(
849                name: "mz_persist_compaction_started",
850                help: "count of compactions started",
851            )),
852            failed: registry.register(metric!(
853                name: "mz_persist_compaction_failed",
854                help: "count of compactions failed",
855            )),
856            applied: registry.register(metric!(
857                name: "mz_persist_compaction_applied",
858                help: "count of compactions applied to state",
859            )),
860            timed_out: registry.register(metric!(
861                name: "mz_persist_compaction_timed_out",
862                help: "count of compactions that timed out",
863            )),
864            noop: registry.register(metric!(
865                name: "mz_persist_compaction_noop",
866                help: "count of compactions discarded (obsolete)",
867            )),
868            seconds: registry.register(metric!(
869                name: "mz_persist_compaction_seconds",
870                help: "time spent in compaction",
871            )),
872            concurrency_waits: registry.register(metric!(
873                name: "mz_persist_compaction_concurrency_waits",
874                help: "count of compaction requests that ever blocked due to concurrency limit",
875            )),
876            queued_seconds: registry.register(metric!(
877                name: "mz_persist_compaction_queued_seconds",
878                help: "time that compaction requests spent queued",
879            )),
880            memory_violations: registry.register(metric!(
881                name: "mz_persist_compaction_memory_violations",
882                help: "count of compaction memory requirement violations",
883            )),
884            runs_compacted: registry.register(metric!(
885                name: "mz_persist_compaction_runs_compacted",
886                help: "count of runs compacted",
887            )),
888            chunks_compacted: registry.register(metric!(
889                name: "mz_persist_compaction_chunks_compacted",
890                help: "count of run chunks compacted",
891            )),
892            not_all_prefetched: registry.register(metric!(
893                name: "mz_persist_compaction_not_all_prefetched",
894                help: "count of compactions where not all inputs were prefetched",
895            )),
896            parts_prefetched: registry.register(metric!(
897                name: "mz_persist_compaction_parts_prefetched",
898                help: "count of compaction parts completely prefetched by the time they're needed",
899            )),
900            parts_waited: registry.register(metric!(
901                name: "mz_persist_compaction_parts_waited",
902                help: "count of compaction parts that had to be waited on",
903            )),
904            fast_path_eligible: registry.register(metric!(
905                name: "mz_persist_compaction_fast_path_eligible",
906                help: "count of compaction requests that could have used the fast-path optimization",
907            )),
908            admin_count: registry.register(metric!(
909                name: "mz_persist_compaction_admin_count",
910                help: "count of compaction requests that were performed by admin tooling",
911            )),
912            applied_exact_match: registry.register(metric!(
913                name: "mz_persist_compaction_applied_exact_match",
914                help: "count of merge results that exactly replaced a SpineBatch",
915            )),
916            applied_subset_match: registry.register(metric!(
917                name: "mz_persist_compaction_applied_subset_match",
918                help: "count of merge results that replaced a subset of a SpineBatch",
919            )),
920            not_applied_too_many_updates: registry.register(metric!(
921                name: "mz_persist_compaction_not_applied_too_many_updates",
922                help: "count of merge results that did not apply due to too many updates",
923            )),
924            batch: BatchWriteMetrics::new(registry, "compaction"),
925            steps: CompactionStepTimings::new(step_timings.clone()),
926            schema_selection: CompactionSchemaSelection::new(schema_selection.clone()),
927            _steps_vec: step_timings,
928        }
929    }
930}
931
932#[derive(Debug)]
933pub struct CompactionStepTimings {
934    pub(crate) part_fetch_seconds: Counter,
935    pub(crate) heap_population_seconds: Counter,
936}
937
938impl CompactionStepTimings {
939    fn new(step_timings: CounterVec) -> CompactionStepTimings {
940        CompactionStepTimings {
941            part_fetch_seconds: step_timings.with_label_values(&["part_fetch"]),
942            heap_population_seconds: step_timings.with_label_values(&["heap_population"]),
943        }
944    }
945}
946
947#[derive(Debug)]
948pub struct CompactionSchemaSelection {
949    pub(crate) recent_schema: Counter,
950    pub(crate) no_schema: Counter,
951}
952
953impl CompactionSchemaSelection {
954    fn new(schema_selection: CounterVec) -> CompactionSchemaSelection {
955        CompactionSchemaSelection {
956            recent_schema: schema_selection.with_label_values(&["recent"]),
957            no_schema: schema_selection.with_label_values(&["none"]),
958        }
959    }
960}
961
962#[derive(Debug)]
963pub struct GcMetrics {
964    pub(crate) noop: IntCounter,
965    pub(crate) started: IntCounter,
966    pub(crate) finished: IntCounter,
967    pub(crate) merged: IntCounter,
968    pub(crate) seconds: Counter,
969    pub(crate) steps: GcStepTimings,
970}
971
972#[derive(Debug)]
973pub struct GcStepTimings {
974    pub(crate) find_removable_rollups: Counter,
975    pub(crate) fetch_seconds: Counter,
976    pub(crate) find_deletable_blobs_seconds: Counter,
977    pub(crate) delete_rollup_seconds: Counter,
978    pub(crate) delete_batch_part_seconds: Counter,
979    pub(crate) truncate_diff_seconds: Counter,
980    pub(crate) remove_rollups_from_state: Counter,
981    pub(crate) post_gc_calculations_seconds: Counter,
982}
983
984impl GcStepTimings {
985    fn new(step_timings: CounterVec) -> Self {
986        Self {
987            find_removable_rollups: step_timings.with_label_values(&["find_removable_rollups"]),
988            fetch_seconds: step_timings.with_label_values(&["fetch"]),
989            find_deletable_blobs_seconds: step_timings.with_label_values(&["find_deletable_blobs"]),
990            delete_rollup_seconds: step_timings.with_label_values(&["delete_rollup"]),
991            delete_batch_part_seconds: step_timings.with_label_values(&["delete_batch_part"]),
992            truncate_diff_seconds: step_timings.with_label_values(&["truncate_diff"]),
993            remove_rollups_from_state: step_timings
994                .with_label_values(&["remove_rollups_from_state"]),
995            post_gc_calculations_seconds: step_timings.with_label_values(&["post_gc_calculations"]),
996        }
997    }
998}
999
1000impl GcMetrics {
1001    fn new(registry: &MetricsRegistry) -> Self {
1002        let step_timings: CounterVec = registry.register(metric!(
1003                name: "mz_persist_gc_step_seconds",
1004                help: "time spent on individual steps of gc",
1005                var_labels: ["step"],
1006        ));
1007        GcMetrics {
1008            noop: registry.register(metric!(
1009                name: "mz_persist_gc_noop",
1010                help: "count of garbage collections skipped because they were already done",
1011            )),
1012            started: registry.register(metric!(
1013                name: "mz_persist_gc_started",
1014                help: "count of garbage collections started",
1015            )),
1016            finished: registry.register(metric!(
1017                name: "mz_persist_gc_finished",
1018                help: "count of garbage collections finished",
1019            )),
1020            merged: registry.register(metric!(
1021                name: "mz_persist_gc_merged_reqs",
1022                help: "count of garbage collection requests merged",
1023            )),
1024            seconds: registry.register(metric!(
1025                name: "mz_persist_gc_seconds",
1026                help: "time spent in garbage collections",
1027            )),
1028            steps: GcStepTimings::new(step_timings),
1029        }
1030    }
1031}
1032
1033#[derive(Debug)]
1034pub struct LeaseMetrics {
1035    pub(crate) timeout_read: IntCounter,
1036    pub(crate) dropped_part: IntCounter,
1037}
1038
1039impl LeaseMetrics {
1040    fn new(registry: &MetricsRegistry) -> Self {
1041        LeaseMetrics {
1042            timeout_read: registry.register(metric!(
1043                name: "mz_persist_lease_timeout_read",
1044                help: "count of readers whose lease timed out",
1045            )),
1046            dropped_part: registry.register(metric!(
1047                name: "mz_persist_lease_dropped_part",
1048                help: "count of LeasedBatchParts that were dropped without being politely returned",
1049            )),
1050        }
1051    }
1052}
1053
1054struct IncOnDrop(IntCounter);
1055
1056impl Drop for IncOnDrop {
1057    fn drop(&mut self) {
1058        self.0.inc()
1059    }
1060}
1061
1062pub struct MetricsRetryStream {
1063    retry: RetryStream,
1064    pub(crate) retries: IntCounter,
1065    sleep_seconds: Counter,
1066    _finished: IncOnDrop,
1067}
1068
1069impl MetricsRetryStream {
1070    pub fn new(retry: RetryStream, metrics: &RetryMetrics) -> Self {
1071        metrics.started.inc();
1072        MetricsRetryStream {
1073            retry,
1074            retries: metrics.retries.clone(),
1075            sleep_seconds: metrics.sleep_seconds.clone(),
1076            _finished: IncOnDrop(metrics.finished.clone()),
1077        }
1078    }
1079
1080    /// How many times [Self::sleep] has been called.
1081    pub fn attempt(&self) -> usize {
1082        self.retry.attempt()
1083    }
1084
1085    /// The next sleep (without jitter for easy printing in logs).
1086    pub fn next_sleep(&self) -> Duration {
1087        self.retry.next_sleep()
1088    }
1089
1090    /// Executes the next sleep in the series.
1091    ///
1092    /// This isn't cancel-safe, so it consumes and returns self, to prevent
1093    /// accidental mis-use.
1094    pub async fn sleep(self) -> Self {
1095        self.retries.inc();
1096        self.sleep_seconds
1097            .inc_by(self.retry.next_sleep().as_secs_f64());
1098        let retry = self.retry.sleep().await;
1099        MetricsRetryStream {
1100            retry,
1101            retries: self.retries,
1102            sleep_seconds: self.sleep_seconds,
1103            _finished: self._finished,
1104        }
1105    }
1106}
1107
1108#[derive(Debug)]
1109pub struct CodecsMetrics {
1110    pub(crate) state: CodecMetrics,
1111    pub(crate) state_diff: CodecMetrics,
1112    pub(crate) batch: CodecMetrics,
1113    pub(crate) key: CodecMetrics,
1114    pub(crate) val: CodecMetrics,
1115    // Intentionally not adding time and diff because they're just
1116    // `{to,from}_le_bytes`.
1117}
1118
1119#[derive(Debug)]
1120pub struct CodecMetrics {
1121    pub(crate) encode_count: IntCounter,
1122    pub(crate) encode_seconds: Counter,
1123    pub(crate) decode_count: IntCounter,
1124    pub(crate) decode_seconds: Counter,
1125}
1126
1127impl CodecMetrics {
1128    pub(crate) fn encode<R, F: FnOnce() -> R>(&self, f: F) -> R {
1129        let now = Instant::now();
1130        let r = f();
1131        self.encode_count.inc();
1132        self.encode_seconds.inc_by(now.elapsed().as_secs_f64());
1133        r
1134    }
1135
1136    pub(crate) fn decode<R, F: FnOnce() -> R>(&self, f: F) -> R {
1137        let now = Instant::now();
1138        let r = f();
1139        self.decode_count.inc();
1140        self.decode_seconds.inc_by(now.elapsed().as_secs_f64());
1141        r
1142    }
1143}
1144
1145#[derive(Debug)]
1146pub struct StateMetrics {
1147    pub(crate) apply_spine_fast_path: IntCounter,
1148    pub(crate) apply_spine_slow_path: IntCounter,
1149    pub(crate) apply_spine_slow_path_lenient: IntCounter,
1150    pub(crate) apply_spine_slow_path_lenient_adjustment: IntCounter,
1151    pub(crate) apply_spine_slow_path_with_reconstruction: IntCounter,
1152    pub(crate) apply_spine_flattened: IntCounter,
1153    pub(crate) update_state_noop_path: IntCounter,
1154    pub(crate) update_state_empty_path: IntCounter,
1155    pub(crate) update_state_fast_path: IntCounter,
1156    pub(crate) update_state_slow_path: IntCounter,
1157    pub(crate) rollup_at_seqno_migration: IntCounter,
1158    pub(crate) fetch_recent_live_diffs_fast_path: IntCounter,
1159    pub(crate) fetch_recent_live_diffs_slow_path: IntCounter,
1160    pub(crate) writer_added: IntCounter,
1161    pub(crate) writer_removed: IntCounter,
1162    pub(crate) force_apply_hostname: IntCounter,
1163    pub(crate) rollup_write_success: IntCounter,
1164    pub(crate) rollup_write_noop_latest: IntCounter,
1165    pub(crate) rollup_write_noop_truncated: IntCounter,
1166}
1167
1168impl StateMetrics {
1169    pub(crate) fn new(registry: &MetricsRegistry) -> Self {
1170        let rollup_write_noop: IntCounterVec = registry.register(metric!(
1171                name: "mz_persist_state_rollup_write_noop",
1172                help: "count of no-op rollup writes",
1173                var_labels: ["reason"],
1174        ));
1175
1176        StateMetrics {
1177            apply_spine_fast_path: registry.register(metric!(
1178                name: "mz_persist_state_apply_spine_fast_path",
1179                help: "count of spine diff applications that hit the fast path",
1180            )),
1181            apply_spine_slow_path: registry.register(metric!(
1182                name: "mz_persist_state_apply_spine_slow_path",
1183                help: "count of spine diff applications that hit the slow path",
1184            )),
1185            apply_spine_slow_path_lenient: registry.register(metric!(
1186                name: "mz_persist_state_apply_spine_slow_path_lenient",
1187                help: "count of spine diff applications that hit the lenient compaction apply path",
1188            )),
1189            apply_spine_slow_path_lenient_adjustment: registry.register(metric!(
1190                name: "mz_persist_state_apply_spine_slow_path_lenient_adjustment",
1191                help: "count of adjustments made by the lenient compaction apply path",
1192            )),
1193            apply_spine_slow_path_with_reconstruction: registry.register(metric!(
1194                name: "mz_persist_state_apply_spine_slow_path_with_reconstruction",
1195                help: "count of spine diff applications that hit the slow path with extra spine reconstruction step",
1196            )),
1197            apply_spine_flattened: registry.register(metric!(
1198                name: "mz_persist_state_apply_spine_flattened",
1199                help: "count of spine diff applications that flatten the trace",
1200            )),
1201            update_state_noop_path: registry.register(metric!(
1202                name: "mz_persist_state_update_state_noop_path",
1203                help: "count of state update applications that no-oped due to shared state",
1204            )),
1205            update_state_empty_path: registry.register(metric!(
1206                name: "mz_persist_state_update_state_empty_path",
1207                help: "count of state update applications that found no new updates",
1208            )),
1209            update_state_fast_path: registry.register(metric!(
1210                name: "mz_persist_state_update_state_fast_path",
1211                help: "count of state update applications that hit the fast path",
1212            )),
1213            update_state_slow_path: registry.register(metric!(
1214                name: "mz_persist_state_update_state_slow_path",
1215                help: "count of state update applications that hit the slow path",
1216            )),
1217            rollup_at_seqno_migration: registry.register(metric!(
1218                name: "mz_persist_state_rollup_at_seqno_migration",
1219                help: "count of fetch_rollup_at_seqno calls that only worked because of the migration",
1220            )),
1221            fetch_recent_live_diffs_fast_path: registry.register(metric!(
1222                name: "mz_persist_state_fetch_recent_live_diffs_fast_path",
1223                help: "count of fetch_recent_live_diffs that hit the fast path",
1224            )),
1225            fetch_recent_live_diffs_slow_path: registry.register(metric!(
1226                name: "mz_persist_state_fetch_recent_live_diffs_slow_path",
1227                help: "count of fetch_recent_live_diffs that hit the slow path",
1228            )),
1229            writer_added: registry.register(metric!(
1230                name: "mz_persist_state_writer_added",
1231                help: "count of writers added to the state",
1232            )),
1233            writer_removed: registry.register(metric!(
1234                name: "mz_persist_state_writer_removed",
1235                help: "count of writers removed from the state",
1236            )),
1237            force_apply_hostname: registry.register(metric!(
1238                name: "mz_persist_state_force_applied_hostname",
1239                help: "count of when hostname diffs needed to be force applied",
1240            )),
1241            rollup_write_success: registry.register(metric!(
1242                name: "mz_persist_state_rollup_write_success",
1243                help: "count of rollups written successful (may not be linked in to state)",
1244            )),
1245            rollup_write_noop_latest: rollup_write_noop.with_label_values(&["latest"]),
1246            rollup_write_noop_truncated: rollup_write_noop.with_label_values(&["truncated"]),
1247        }
1248    }
1249}
1250
1251#[derive(Debug)]
1252pub struct ShardsMetrics {
1253    // Unlike all the other metrics in here, ShardsMetrics intentionally uses
1254    // the DeleteOnDrop wrappers. A process might stop using a shard (drop all
1255    // handles to it) but e.g. the set of commands never changes.
1256    _count: ComputedIntGauge,
1257    since: mz_ore::metrics::IntGaugeVec,
1258    upper: mz_ore::metrics::IntGaugeVec,
1259    encoded_rollup_size: mz_ore::metrics::UIntGaugeVec,
1260    encoded_diff_size: mz_ore::metrics::IntCounterVec,
1261    hollow_batch_count: mz_ore::metrics::UIntGaugeVec,
1262    spine_batch_count: mz_ore::metrics::UIntGaugeVec,
1263    batch_part_count: mz_ore::metrics::UIntGaugeVec,
1264    batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
1265    batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
1266    update_count: mz_ore::metrics::UIntGaugeVec,
1267    rollup_count: mz_ore::metrics::UIntGaugeVec,
1268    largest_batch_size: mz_ore::metrics::UIntGaugeVec,
1269    seqnos_held: mz_ore::metrics::UIntGaugeVec,
1270    seqnos_since_last_rollup: mz_ore::metrics::UIntGaugeVec,
1271    gc_seqno_held_parts: mz_ore::metrics::UIntGaugeVec,
1272    gc_live_diffs: mz_ore::metrics::UIntGaugeVec,
1273    gc_finished: mz_ore::metrics::IntCounterVec,
1274    compaction_applied: mz_ore::metrics::IntCounterVec,
1275    cmd_succeeded: mz_ore::metrics::IntCounterVec,
1276    usage_current_state_batches_bytes: mz_ore::metrics::UIntGaugeVec,
1277    usage_current_state_rollups_bytes: mz_ore::metrics::UIntGaugeVec,
1278    usage_referenced_not_current_state_bytes: mz_ore::metrics::UIntGaugeVec,
1279    usage_not_leaked_not_referenced_bytes: mz_ore::metrics::UIntGaugeVec,
1280    usage_leaked_bytes: mz_ore::metrics::UIntGaugeVec,
1281    pubsub_push_diff_applied: mz_ore::metrics::IntCounterVec,
1282    pubsub_push_diff_not_applied_stale: mz_ore::metrics::IntCounterVec,
1283    pubsub_push_diff_not_applied_out_of_order: mz_ore::metrics::IntCounterVec,
1284    stale_version: mz_ore::metrics::UIntGaugeVec,
1285    blob_gets: mz_ore::metrics::IntCounterVec,
1286    blob_sets: mz_ore::metrics::IntCounterVec,
1287    live_writers: mz_ore::metrics::UIntGaugeVec,
1288    unconsolidated_snapshot: mz_ore::metrics::IntCounterVec,
1289    backpressure_emitted_bytes: IntCounterVec,
1290    backpressure_last_backpressured_bytes: UIntGaugeVec,
1291    backpressure_retired_bytes: IntCounterVec,
1292    rewrite_part_count: UIntGaugeVec,
1293    inline_part_count: UIntGaugeVec,
1294    inline_part_bytes: UIntGaugeVec,
1295    compact_batches: UIntGaugeVec,
1296    compacting_batches: UIntGaugeVec,
1297    noncompact_batches: UIntGaugeVec,
1298    schema_registry_version_count: UIntGaugeVec,
1299    inline_backpressure_count: IntCounterVec,
1300    // We hand out `Arc<ShardMetrics>` to read and write handles, but store it
1301    // here as `Weak`. This allows us to discover if it's no longer in use and
1302    // so we can remove it from the map.
1303    shards: Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
1304}
1305
1306impl ShardsMetrics {
1307    fn new(registry: &MetricsRegistry) -> Self {
1308        let shards = Arc::new(Mutex::new(BTreeMap::new()));
1309        let shards_count = Arc::clone(&shards);
1310        ShardsMetrics {
1311            _count: registry.register_computed_gauge(
1312                metric!(
1313                    name: "mz_persist_shard_count",
1314                    help: "count of all active shards on this process",
1315                ),
1316                move || {
1317                    let mut ret = 0;
1318                    Self::compute(&shards_count, |_m| ret += 1);
1319                    ret
1320                },
1321            ),
1322            since: registry.register(metric!(
1323                name: "mz_persist_shard_since",
1324                help: "since by shard",
1325                var_labels: ["shard", "name"],
1326            )),
1327            upper: registry.register(metric!(
1328                name: "mz_persist_shard_upper",
1329                help: "upper by shard",
1330                var_labels: ["shard", "name"],
1331            )),
1332            encoded_rollup_size: registry.register(metric!(
1333                name: "mz_persist_shard_rollup_size_bytes",
1334                help: "total encoded rollup size by shard",
1335                var_labels: ["shard", "name"],
1336            )),
1337            encoded_diff_size: registry.register(metric!(
1338                name: "mz_persist_shard_diff_size_bytes",
1339                help: "total encoded diff size by shard",
1340                var_labels: ["shard", "name"],
1341            )),
1342            hollow_batch_count: registry.register(metric!(
1343                name: "mz_persist_shard_hollow_batch_count",
1344                help: "count of hollow batches by shard",
1345                var_labels: ["shard", "name"],
1346            )),
1347            spine_batch_count: registry.register(metric!(
1348                name: "mz_persist_shard_spine_batch_count",
1349                help: "count of spine batches by shard",
1350                var_labels: ["shard", "name"],
1351            )),
1352            batch_part_count: registry.register(metric!(
1353                name: "mz_persist_shard_batch_part_count",
1354                help: "count of batch parts by shard",
1355                var_labels: ["shard", "name"],
1356            )),
1357            batch_part_version_count: registry.register(metric!(
1358                name: "mz_persist_shard_batch_part_version_count",
1359                help: "count of batch parts by shard and version",
1360                var_labels: ["shard", "name", "version"],
1361            )),
1362            batch_part_version_bytes: registry.register(metric!(
1363                name: "mz_persist_shard_batch_part_version_bytes",
1364                help: "total bytes in batch parts by shard and version",
1365                var_labels: ["shard", "name", "version"],
1366            )),
1367            update_count: registry.register(metric!(
1368                name: "mz_persist_shard_update_count",
1369                help: "count of updates by shard",
1370                var_labels: ["shard", "name"],
1371            )),
1372            rollup_count: registry.register(metric!(
1373                name: "mz_persist_shard_rollup_count",
1374                help: "count of rollups by shard",
1375                var_labels: ["shard", "name"],
1376            )),
1377            largest_batch_size: registry.register(metric!(
1378                name: "mz_persist_shard_largest_batch_size",
1379                help: "largest encoded batch size by shard",
1380                var_labels: ["shard", "name"],
1381            )),
1382            seqnos_held: registry.register(metric!(
1383                name: "mz_persist_shard_seqnos_held",
1384                help: "maximum count of gc-ineligible states by shard",
1385                var_labels: ["shard", "name"],
1386            )),
1387            seqnos_since_last_rollup: registry.register(metric!(
1388                name: "mz_persist_shard_seqnos_since_last_rollup",
1389                help: "count of seqnos since last rollup",
1390                var_labels: ["shard", "name"],
1391            )),
1392            gc_seqno_held_parts: registry.register(metric!(
1393                name: "mz_persist_shard_gc_seqno_held_parts",
1394                help: "count of parts referenced by some live state but not the current state (ie. parts kept only to satisfy seqno holds) at GC time",
1395                var_labels: ["shard", "name"],
1396            )),
1397            gc_live_diffs: registry.register(metric!(
1398                name: "mz_persist_shard_gc_live_diffs",
1399                help: "the number of diffs (or, alternatively, the number of seqnos) present in consensus state at GC time",
1400                var_labels: ["shard", "name"],
1401            )),
1402            gc_finished: registry.register(metric!(
1403                name: "mz_persist_shard_gc_finished",
1404                help: "count of garbage collections finished by shard",
1405                var_labels: ["shard", "name"],
1406            )),
1407            compaction_applied: registry.register(metric!(
1408                name: "mz_persist_shard_compaction_applied",
1409                help: "count of compactions applied to state by shard",
1410                var_labels: ["shard", "name"],
1411            )),
1412            cmd_succeeded: registry.register(metric!(
1413                name: "mz_persist_shard_cmd_succeeded",
1414                help: "count of commands succeeded by shard",
1415                var_labels: ["shard", "name"],
1416            )),
1417            usage_current_state_batches_bytes: registry.register(metric!(
1418                name: "mz_persist_shard_usage_current_state_batches_bytes",
1419                help: "data in batches/parts referenced by current version of state",
1420                var_labels: ["shard", "name"],
1421            )),
1422            usage_current_state_rollups_bytes: registry.register(metric!(
1423                name: "mz_persist_shard_usage_current_state_rollups_bytes",
1424                help: "data in rollups referenced by current version of state",
1425                var_labels: ["shard", "name"],
1426            )),
1427            usage_referenced_not_current_state_bytes: registry.register(metric!(
1428                name: "mz_persist_shard_usage_referenced_not_current_state_bytes",
1429                help: "data referenced only by a previous version of state",
1430                var_labels: ["shard", "name"],
1431            )),
1432            usage_not_leaked_not_referenced_bytes: registry.register(metric!(
1433                name: "mz_persist_shard_usage_not_leaked_not_referenced_bytes",
1434                help: "data written by an active writer but not referenced by any version of state",
1435                var_labels: ["shard", "name"],
1436            )),
1437            usage_leaked_bytes: registry.register(metric!(
1438                name: "mz_persist_shard_usage_leaked_bytes",
1439                help: "data reclaimable by a leaked blob detector",
1440                var_labels: ["shard", "name"],
1441            )),
1442            pubsub_push_diff_applied: registry.register(metric!(
1443                name: "mz_persist_shard_pubsub_diff_applied",
1444                help: "number of diffs received via pubsub that applied",
1445                var_labels: ["shard", "name"],
1446            )),
1447            pubsub_push_diff_not_applied_stale: registry.register(metric!(
1448                name: "mz_persist_shard_pubsub_diff_not_applied_stale",
1449                help: "number of diffs received via pubsub that did not apply due to staleness",
1450                var_labels: ["shard", "name"],
1451            )),
1452            pubsub_push_diff_not_applied_out_of_order: registry.register(metric!(
1453                name: "mz_persist_shard_pubsub_diff_not_applied_out_of_order",
1454                help: "number of diffs received via pubsub that did not apply due to out-of-order delivery",
1455                var_labels: ["shard", "name"],
1456            )),
1457            stale_version: registry.register(metric!(
1458                name: "mz_persist_shard_stale_version",
1459                help: "indicates whether the current version of the shard is less than the current version of the code",
1460                var_labels: ["shard", "name"],
1461            )),
1462            blob_gets: registry.register(metric!(
1463                name: "mz_persist_shard_blob_gets",
1464                help: "number of Blob::get calls for this shard",
1465                var_labels: ["shard", "name"],
1466            )),
1467            blob_sets: registry.register(metric!(
1468                name: "mz_persist_shard_blob_sets",
1469                help: "number of Blob::set calls for this shard",
1470                var_labels: ["shard", "name"],
1471            )),
1472            live_writers: registry.register(metric!(
1473                name: "mz_persist_shard_live_writers",
1474                help: "number of writers that have recently appended updates to this shard",
1475                var_labels: ["shard", "name"],
1476            )),
1477            unconsolidated_snapshot: registry.register(metric!(
1478                name: "mz_persist_shard_unconsolidated_snapshot",
1479                help: "in snapshot_and_read, the number of times consolidating the raw data wasn't enough to produce consolidated output",
1480                var_labels: ["shard", "name"],
1481            )),
1482            backpressure_emitted_bytes: registry.register(metric!(
1483                name: "mz_persist_backpressure_emitted_bytes",
1484                help: "A counter with the number of emitted bytes.",
1485                var_labels: ["shard", "name"],
1486            )),
1487            backpressure_last_backpressured_bytes: registry.register(metric!(
1488                name: "mz_persist_backpressure_last_backpressured_bytes",
1489                help: "The last count of bytes we are waiting to be retired in \
1490                    the operator. This cannot be directly compared to \
1491                    `retired_bytes`, but CAN indicate that backpressure is happening.",
1492                var_labels: ["shard", "name"],
1493            )),
1494            backpressure_retired_bytes: registry.register(metric!(
1495                name: "mz_persist_backpressure_retired_bytes",
1496                help:"A counter with the number of bytes retired by downstream processing.",
1497                var_labels: ["shard", "name"],
1498            )),
1499            rewrite_part_count: registry.register(metric!(
1500                name: "mz_persist_shard_rewrite_part_count",
1501                help: "count of batch parts with rewrites by shard",
1502                var_labels: ["shard", "name"],
1503            )),
1504            inline_part_count: registry.register(metric!(
1505                name: "mz_persist_shard_inline_part_count",
1506                help: "count of parts inline in shard metadata",
1507                var_labels: ["shard", "name"],
1508            )),
1509            inline_part_bytes: registry.register(metric!(
1510                name: "mz_persist_shard_inline_part_bytes",
1511                help: "total size of parts inline in shard metadata",
1512                var_labels: ["shard", "name"],
1513            )),
1514            compact_batches: registry.register(metric!(
1515                name: "mz_persist_shard_compact_batches",
1516                help: "number of fully compact batches in the shard",
1517                var_labels: ["shard", "name"],
1518            )),
1519            compacting_batches: registry.register(metric!(
1520                name: "mz_persist_shard_compacting_batches",
1521                help: "number of batches in the shard with compactions in progress",
1522                var_labels: ["shard", "name"],
1523            )),
1524            noncompact_batches: registry.register(metric!(
1525                name: "mz_persist_shard_noncompact_batches",
1526                help: "number of batches in the shard that aren't compact and have no ongoing compaction",
1527                var_labels: ["shard", "name"],
1528            )),
1529            schema_registry_version_count: registry.register(metric!(
1530                name: "mz_persist_shard_schema_registry_version_count",
1531                help: "count of versions in the schema registry",
1532                var_labels: ["shard", "name"],
1533            )),
1534            inline_backpressure_count: registry.register(metric!(
1535                name: "mz_persist_shard_inline_backpressure_count",
1536                help: "count of CaA attempts retried because of inline backpressure",
1537                var_labels: ["shard", "name"],
1538            )),
1539            shards,
1540        }
1541    }
1542
1543    pub fn shard(&self, shard_id: &ShardId, name: &str) -> Arc<ShardMetrics> {
1544        let mut shards = self.shards.lock().expect("mutex poisoned");
1545        if let Some(shard) = shards.get(shard_id) {
1546            if let Some(shard) = shard.upgrade() {
1547                return Arc::clone(&shard);
1548            } else {
1549                assert!(shards.remove(shard_id).is_some());
1550            }
1551        }
1552        let shard = Arc::new(ShardMetrics::new(shard_id, name, self));
1553        assert!(
1554            shards
1555                .insert(shard_id.clone(), Arc::downgrade(&shard))
1556                .is_none()
1557        );
1558        shard
1559    }
1560
1561    fn compute<F: FnMut(&ShardMetrics)>(
1562        shards: &Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
1563        mut f: F,
1564    ) {
1565        let mut shards = shards.lock().expect("mutex poisoned");
1566        let mut deleted_shards = Vec::new();
1567        for (shard_id, metrics) in shards.iter() {
1568            if let Some(metrics) = metrics.upgrade() {
1569                f(&metrics);
1570            } else {
1571                deleted_shards.push(shard_id.clone());
1572            }
1573        }
1574        for deleted_shard_id in deleted_shards {
1575            assert!(shards.remove(&deleted_shard_id).is_some());
1576        }
1577    }
1578}
1579
1580#[derive(Debug)]
1581pub struct ShardMetrics {
1582    pub shard_id: ShardId,
1583    pub name: String,
1584    pub since: DeleteOnDropGauge<AtomicI64, Vec<String>>,
1585    pub upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
1586    pub largest_batch_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1587    pub latest_rollup_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1588    pub encoded_diff_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1589    pub hollow_batch_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1590    pub spine_batch_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1591    pub batch_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1592    batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
1593    batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
1594    batch_part_version_map: Mutex<BTreeMap<String, BatchPartVersionMetrics>>,
1595    pub update_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1596    pub rollup_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1597    pub seqnos_held: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1598    pub seqnos_since_last_rollup: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1599    pub gc_seqno_held_parts: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1600    pub gc_live_diffs: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1601    pub usage_current_state_batches_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1602    pub usage_current_state_rollups_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1603    pub usage_referenced_not_current_state_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1604    pub usage_not_leaked_not_referenced_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1605    pub usage_leaked_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1606    pub gc_finished: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1607    pub compaction_applied: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1608    pub cmd_succeeded: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1609    pub pubsub_push_diff_applied: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1610    pub pubsub_push_diff_not_applied_stale: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1611    pub pubsub_push_diff_not_applied_out_of_order: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1612    pub stale_version: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1613    pub blob_gets: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1614    pub blob_sets: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1615    pub live_writers: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1616    pub unconsolidated_snapshot: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1617    pub backpressure_emitted_bytes: Arc<DeleteOnDropCounter<AtomicU64, Vec<String>>>,
1618    pub backpressure_last_backpressured_bytes: Arc<DeleteOnDropGauge<AtomicU64, Vec<String>>>,
1619    pub backpressure_retired_bytes: Arc<DeleteOnDropCounter<AtomicU64, Vec<String>>>,
1620    pub rewrite_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1621    pub inline_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1622    pub inline_part_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1623    pub compact_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1624    pub compacting_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1625    pub noncompact_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1626    pub schema_registry_version_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1627    pub inline_backpressure_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1628}
1629
1630impl ShardMetrics {
1631    pub fn new(shard_id: &ShardId, name: &str, shards_metrics: &ShardsMetrics) -> Self {
1632        let shard = shard_id.to_string();
1633        ShardMetrics {
1634            shard_id: *shard_id,
1635            name: name.to_string(),
1636            since: shards_metrics
1637                .since
1638                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1639            upper: shards_metrics
1640                .upper
1641                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1642            latest_rollup_size: shards_metrics
1643                .encoded_rollup_size
1644                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1645            encoded_diff_size: shards_metrics
1646                .encoded_diff_size
1647                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1648            hollow_batch_count: shards_metrics
1649                .hollow_batch_count
1650                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1651            spine_batch_count: shards_metrics
1652                .spine_batch_count
1653                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1654            batch_part_count: shards_metrics
1655                .batch_part_count
1656                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1657            batch_part_version_count: shards_metrics.batch_part_version_count.clone(),
1658            batch_part_version_bytes: shards_metrics.batch_part_version_bytes.clone(),
1659            batch_part_version_map: Mutex::new(BTreeMap::new()),
1660            update_count: shards_metrics
1661                .update_count
1662                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1663            rollup_count: shards_metrics
1664                .rollup_count
1665                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1666            largest_batch_size: shards_metrics
1667                .largest_batch_size
1668                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1669            seqnos_held: shards_metrics
1670                .seqnos_held
1671                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1672            seqnos_since_last_rollup: shards_metrics
1673                .seqnos_since_last_rollup
1674                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1675            gc_seqno_held_parts: shards_metrics
1676                .gc_seqno_held_parts
1677                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1678            gc_live_diffs: shards_metrics
1679                .gc_live_diffs
1680                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1681            gc_finished: shards_metrics
1682                .gc_finished
1683                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1684            compaction_applied: shards_metrics
1685                .compaction_applied
1686                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1687            cmd_succeeded: shards_metrics
1688                .cmd_succeeded
1689                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1690            usage_current_state_batches_bytes: shards_metrics
1691                .usage_current_state_batches_bytes
1692                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1693            usage_current_state_rollups_bytes: shards_metrics
1694                .usage_current_state_rollups_bytes
1695                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1696            usage_referenced_not_current_state_bytes: shards_metrics
1697                .usage_referenced_not_current_state_bytes
1698                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1699            usage_not_leaked_not_referenced_bytes: shards_metrics
1700                .usage_not_leaked_not_referenced_bytes
1701                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1702            usage_leaked_bytes: shards_metrics
1703                .usage_leaked_bytes
1704                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1705            pubsub_push_diff_applied: shards_metrics
1706                .pubsub_push_diff_applied
1707                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1708            pubsub_push_diff_not_applied_stale: shards_metrics
1709                .pubsub_push_diff_not_applied_stale
1710                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1711            pubsub_push_diff_not_applied_out_of_order: shards_metrics
1712                .pubsub_push_diff_not_applied_out_of_order
1713                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1714            stale_version: shards_metrics
1715                .stale_version
1716                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1717            blob_gets: shards_metrics
1718                .blob_gets
1719                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1720            blob_sets: shards_metrics
1721                .blob_sets
1722                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1723            live_writers: shards_metrics
1724                .live_writers
1725                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1726            unconsolidated_snapshot: shards_metrics
1727                .unconsolidated_snapshot
1728                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1729            backpressure_emitted_bytes: Arc::new(
1730                shards_metrics
1731                    .backpressure_emitted_bytes
1732                    .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1733            ),
1734            backpressure_last_backpressured_bytes: Arc::new(
1735                shards_metrics
1736                    .backpressure_last_backpressured_bytes
1737                    .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1738            ),
1739            backpressure_retired_bytes: Arc::new(
1740                shards_metrics
1741                    .backpressure_retired_bytes
1742                    .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1743            ),
1744            rewrite_part_count: shards_metrics
1745                .rewrite_part_count
1746                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1747            inline_part_count: shards_metrics
1748                .inline_part_count
1749                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1750            inline_part_bytes: shards_metrics
1751                .inline_part_bytes
1752                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1753            compact_batches: shards_metrics
1754                .compact_batches
1755                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1756            compacting_batches: shards_metrics
1757                .compacting_batches
1758                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1759            noncompact_batches: shards_metrics
1760                .noncompact_batches
1761                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1762            schema_registry_version_count: shards_metrics
1763                .schema_registry_version_count
1764                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1765            inline_backpressure_count: shards_metrics
1766                .inline_backpressure_count
1767                .get_delete_on_drop_metric(vec![shard, name.to_string()]),
1768        }
1769    }
1770
1771    pub fn set_since<T: Codec64>(&self, since: &Antichain<T>) {
1772        self.since.set(encode_ts_metric(since))
1773    }
1774
1775    pub fn set_upper<T: Codec64>(&self, upper: &Antichain<T>) {
1776        self.upper.set(encode_ts_metric(upper))
1777    }
1778
1779    pub(crate) fn set_batch_part_versions<'a>(
1780        &self,
1781        batch_parts_by_version: impl Iterator<Item = (&'a str, usize)>,
1782    ) {
1783        let mut map = self
1784            .batch_part_version_map
1785            .lock()
1786            .expect("mutex should not be poisoned");
1787        // NB: It's a bit sus that the below assumes that no one else is
1788        // concurrently modifying the atomics in the gauges, but we're holding
1789        // the mutex this whole time, so it should be true.
1790
1791        // We want to do this in a way that avoids allocating (e.g. summing up a
1792        // map). First reset everything.
1793        for x in map.values() {
1794            x.batch_part_version_count.set(0);
1795            x.batch_part_version_bytes.set(0);
1796        }
1797
1798        // Then go through the iterator, creating new entries as necessary and
1799        // adding.
1800        for (key, bytes) in batch_parts_by_version {
1801            if !map.contains_key(key) {
1802                map.insert(
1803                    key.to_owned(),
1804                    BatchPartVersionMetrics {
1805                        batch_part_version_count: self
1806                            .batch_part_version_count
1807                            .get_delete_on_drop_metric(vec![
1808                                self.shard_id.to_string(),
1809                                self.name.clone(),
1810                                key.to_owned(),
1811                            ]),
1812                        batch_part_version_bytes: self
1813                            .batch_part_version_bytes
1814                            .get_delete_on_drop_metric(vec![
1815                                self.shard_id.to_string(),
1816                                self.name.clone(),
1817                                key.to_owned(),
1818                            ]),
1819                    },
1820                );
1821            }
1822            let value = map.get(key).expect("inserted above");
1823            value.batch_part_version_count.inc();
1824            value.batch_part_version_bytes.add(u64::cast_from(bytes));
1825        }
1826    }
1827}
1828
1829#[derive(Debug)]
1830pub struct BatchPartVersionMetrics {
1831    pub batch_part_version_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1832    pub batch_part_version_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1833}
1834
1835/// Metrics recorded by audits of persist usage
1836#[derive(Debug)]
1837pub struct UsageAuditMetrics {
1838    /// Size of all batch parts stored in Blob
1839    pub blob_batch_part_bytes: UIntGauge,
1840    /// Count of batch parts stored in Blob
1841    pub blob_batch_part_count: UIntGauge,
1842    /// Size of all state rollups stored in Blob
1843    pub blob_rollup_bytes: UIntGauge,
1844    /// Count of state rollups stored in Blob
1845    pub blob_rollup_count: UIntGauge,
1846    /// Size of Blob
1847    pub blob_bytes: UIntGauge,
1848    /// Count of all blobs
1849    pub blob_count: UIntGauge,
1850    /// Time spent fetching blob metadata
1851    pub step_blob_metadata: Counter,
1852    /// Time spent fetching state versions
1853    pub step_state: Counter,
1854    /// Time spent doing math
1855    pub step_math: Counter,
1856}
1857
1858impl UsageAuditMetrics {
1859    fn new(registry: &MetricsRegistry) -> Self {
1860        let step_timings: CounterVec = registry.register(metric!(
1861                name: "mz_persist_audit_step_seconds",
1862                help: "time spent on individual steps of audit",
1863                var_labels: ["step"],
1864        ));
1865        UsageAuditMetrics {
1866            blob_batch_part_bytes: registry.register(metric!(
1867                name: "mz_persist_audit_blob_batch_part_bytes",
1868                help: "total size of batch parts in blob",
1869            )),
1870            blob_batch_part_count: registry.register(metric!(
1871                name: "mz_persist_audit_blob_batch_part_count",
1872                help: "count of batch parts in blob",
1873            )),
1874            blob_rollup_bytes: registry.register(metric!(
1875                name: "mz_persist_audit_blob_rollup_bytes",
1876                help: "total size of state rollups stored in blob",
1877            )),
1878            blob_rollup_count: registry.register(metric!(
1879                name: "mz_persist_audit_blob_rollup_count",
1880                help: "count of all state rollups in blob",
1881            )),
1882            blob_bytes: registry.register(metric!(
1883                name: "mz_persist_audit_blob_bytes",
1884                help: "total size of blob",
1885            )),
1886            blob_count: registry.register(metric!(
1887                name: "mz_persist_audit_blob_count",
1888                help: "count of all blobs",
1889            )),
1890            step_blob_metadata: step_timings.with_label_values(&["blob_metadata"]),
1891            step_state: step_timings.with_label_values(&["state"]),
1892            step_math: step_timings.with_label_values(&["math"]),
1893        }
1894    }
1895}
1896
1897/// Represents a change in a number of updates kept in a data structure
1898/// (e.g., a buffer length or capacity change).
1899#[derive(Debug)]
1900pub enum UpdateDelta {
1901    /// A negative delta in the number of updates.
1902    Negative(u64),
1903    /// A non-negative delta in the number of updates.
1904    NonNegative(u64),
1905}
1906
1907impl UpdateDelta {
1908    /// Creates a new `UpdateDelta` from the difference between a new value
1909    /// for a number of updates and the corresponding old value.
1910    pub fn new(new: usize, old: usize) -> Self {
1911        if new < old {
1912            UpdateDelta::Negative(CastFrom::cast_from(old - new))
1913        } else {
1914            UpdateDelta::NonNegative(CastFrom::cast_from(new - old))
1915        }
1916    }
1917}
1918
1919/// Metrics for the persist sink. (While this lies slightly outside the usual
1920/// abstraction boundary of the client, it's convenient to manage them together.
1921#[derive(Debug, Clone)]
1922pub struct SinkMetrics {
1923    /// Cumulative record insertions made to the correction buffer across workers
1924    correction_insertions_total: IntCounter,
1925    /// Cumulative record deletions made to the correction buffer across workers
1926    correction_deletions_total: IntCounter,
1927    /// Cumulative capacity increases made to the correction buffer across workers
1928    correction_capacity_increases_total: IntCounter,
1929    /// Cumulative capacity decreases made to the correction buffer across workers
1930    correction_capacity_decreases_total: IntCounter,
1931    /// Maximum length observed for any one correction buffer per worker
1932    correction_max_per_sink_worker_len_updates: raw::UIntGaugeVec,
1933    /// Maximum capacity observed for any one correction buffer per worker
1934    correction_max_per_sink_worker_capacity_updates: raw::UIntGaugeVec,
1935}
1936
1937impl SinkMetrics {
1938    fn new(registry: &MetricsRegistry) -> Self {
1939        SinkMetrics {
1940            correction_insertions_total: registry.register(metric!(
1941                name: "mz_persist_sink_correction_insertions_total",
1942                help: "The cumulative insertions observed on the correction buffer across workers and persist sinks.",
1943            )),
1944            correction_deletions_total: registry.register(metric!(
1945                name: "mz_persist_sink_correction_deletions_total",
1946                help: "The cumulative deletions observed on the correction buffer across workers and persist sinks.",
1947            )),
1948            correction_capacity_increases_total: registry.register(metric!(
1949                name: "mz_persist_sink_correction_capacity_increases_total",
1950                help: "The cumulative capacity increases observed on the correction buffer across workers and persist sinks.",
1951            )),
1952            correction_capacity_decreases_total: registry.register(metric!(
1953                name: "mz_persist_sink_correction_capacity_decreases_total",
1954                help: "The cumulative capacity decreases observed on the correction buffer across workers and persist sinks.",
1955            )),
1956            correction_max_per_sink_worker_len_updates: registry.register(metric!(
1957                name: "mz_persist_sink_correction_max_per_sink_worker_len_updates",
1958                help: "The maximum length observed for the correction buffer of any single persist sink per worker.",
1959                var_labels: ["worker_id"],
1960            )),
1961            correction_max_per_sink_worker_capacity_updates: registry.register(metric!(
1962                name: "mz_persist_sink_correction_max_per_sink_worker_capacity_updates",
1963                help: "The maximum capacity observed for the correction buffer of any single persist sink per worker.",
1964                var_labels: ["worker_id"],
1965            )),
1966        }
1967    }
1968
1969    /// Obtains a `SinkWorkerMetrics` instance, which allows for metric reporting
1970    /// from a specific `persist_sink` instance for a given worker. The reports will
1971    /// update metrics shared across workers, but provide per-worker contributions
1972    /// to them.
1973    pub fn for_worker(&self, worker_id: usize) -> SinkWorkerMetrics {
1974        let worker = worker_id.to_string();
1975        let correction_max_per_sink_worker_len_updates = self
1976            .correction_max_per_sink_worker_len_updates
1977            .with_label_values(&[&worker]);
1978        let correction_max_per_sink_worker_capacity_updates = self
1979            .correction_max_per_sink_worker_capacity_updates
1980            .with_label_values(&[&worker]);
1981        SinkWorkerMetrics {
1982            correction_max_per_sink_worker_len_updates,
1983            correction_max_per_sink_worker_capacity_updates,
1984        }
1985    }
1986
1987    /// Reports updates to the length and capacity of the correction buffer in the
1988    /// `write_batches` operator of a `persist_sink`.
1989    ///
1990    /// This method updates monotonic metrics based on the deltas and thus can be
1991    /// called across workers and instances of `persist_sink`.
1992    pub fn report_correction_update_deltas(
1993        &self,
1994        correction_len_delta: UpdateDelta,
1995        correction_cap_delta: UpdateDelta,
1996    ) {
1997        // Report insertions or deletions.
1998        match correction_len_delta {
1999            UpdateDelta::NonNegative(delta) => {
2000                if delta > 0 {
2001                    self.correction_insertions_total.inc_by(delta)
2002                }
2003            }
2004            UpdateDelta::Negative(delta) => self.correction_deletions_total.inc_by(delta),
2005        }
2006        // Report capacity increases or decreases.
2007        match correction_cap_delta {
2008            UpdateDelta::NonNegative(delta) => {
2009                if delta > 0 {
2010                    self.correction_capacity_increases_total.inc_by(delta)
2011                }
2012            }
2013            UpdateDelta::Negative(delta) => self.correction_capacity_decreases_total.inc_by(delta),
2014        }
2015    }
2016}
2017
2018/// Metrics for the persist sink that are labeled per-worker.
2019#[derive(Clone, Debug)]
2020pub struct SinkWorkerMetrics {
2021    correction_max_per_sink_worker_len_updates: UIntGauge,
2022    correction_max_per_sink_worker_capacity_updates: UIntGauge,
2023}
2024
2025impl SinkWorkerMetrics {
2026    /// Reports the length and capacity of the correction buffer in the `write_batches`
2027    /// operator of `persist_sink`.
2028    ///
2029    /// This method is used to update metrics that are kept per worker.
2030    pub fn report_correction_update_totals(&self, correction_len: usize, correction_cap: usize) {
2031        // Maintain per-worker peaks.
2032        let correction_len = CastFrom::cast_from(correction_len);
2033        if correction_len > self.correction_max_per_sink_worker_len_updates.get() {
2034            self.correction_max_per_sink_worker_len_updates
2035                .set(correction_len);
2036        }
2037        let correction_cap = CastFrom::cast_from(correction_cap);
2038        if correction_cap > self.correction_max_per_sink_worker_capacity_updates.get() {
2039            self.correction_max_per_sink_worker_capacity_updates
2040                .set(correction_cap);
2041        }
2042    }
2043}
2044
2045/// A minimal set of metrics imported into honeycomb for alerting.
2046#[derive(Debug)]
2047pub struct AlertsMetrics {
2048    pub(crate) blob_failures: IntCounter,
2049    pub(crate) consensus_failures: IntCounter,
2050}
2051
2052impl AlertsMetrics {
2053    fn new(registry: &MetricsRegistry) -> Self {
2054        AlertsMetrics {
2055            blob_failures: registry.register(metric!(
2056                name: "mz_persist_blob_failures",
2057                help: "count of all blob operation failures",
2058                const_labels: {"honeycomb" => "import"},
2059            )),
2060            consensus_failures: registry.register(metric!(
2061                name: "mz_persist_consensus_failures",
2062                help: "count of determinate consensus operation failures",
2063                const_labels: {"honeycomb" => "import"},
2064            )),
2065        }
2066    }
2067}
2068
2069/// Metrics for the PubSubServer implementation.
2070#[derive(Debug)]
2071pub struct PubSubServerMetrics {
2072    pub(crate) active_connections: UIntGauge,
2073    pub(crate) broadcasted_diff_count: IntCounter,
2074    pub(crate) broadcasted_diff_bytes: IntCounter,
2075    pub(crate) broadcasted_diff_dropped_channel_full: IntCounter,
2076
2077    pub(crate) push_seconds: Counter,
2078    pub(crate) subscribe_seconds: Counter,
2079    pub(crate) unsubscribe_seconds: Counter,
2080    pub(crate) connection_cleanup_seconds: Counter,
2081
2082    pub(crate) push_call_count: IntCounter,
2083    pub(crate) subscribe_call_count: IntCounter,
2084    pub(crate) unsubscribe_call_count: IntCounter,
2085}
2086
2087impl PubSubServerMetrics {
2088    pub(crate) fn new(registry: &MetricsRegistry) -> Self {
2089        let op_timings: CounterVec = registry.register(metric!(
2090                name: "mz_persist_pubsub_server_operation_seconds",
2091                help: "time spent in pubsub server performing each operation",
2092                var_labels: ["op"],
2093        ));
2094        let call_count: IntCounterVec = registry.register(metric!(
2095                name: "mz_persist_pubsub_server_call_count",
2096                help: "count of each pubsub server message received",
2097                var_labels: ["call"],
2098        ));
2099
2100        Self {
2101            active_connections: registry.register(metric!(
2102                    name: "mz_persist_pubsub_server_active_connections",
2103                    help: "number of active connections to server",
2104            )),
2105            broadcasted_diff_count: registry.register(metric!(
2106                    name: "mz_persist_pubsub_server_broadcasted_diff_count",
2107                    help: "count of total broadcast diff messages sent",
2108            )),
2109            broadcasted_diff_bytes: registry.register(metric!(
2110                    name: "mz_persist_pubsub_server_broadcasted_diff_bytes",
2111                    help: "count of total broadcast diff bytes sent",
2112            )),
2113            broadcasted_diff_dropped_channel_full: registry.register(metric!(
2114                    name: "mz_persist_pubsub_server_broadcasted_diff_dropped_channel_full",
2115                    help: "count of diffs dropped due to full connection channel",
2116            )),
2117
2118            push_seconds: op_timings.with_label_values(&["push"]),
2119            subscribe_seconds: op_timings.with_label_values(&["subscribe"]),
2120            unsubscribe_seconds: op_timings.with_label_values(&["unsubscribe"]),
2121            connection_cleanup_seconds: op_timings.with_label_values(&["cleanup"]),
2122
2123            push_call_count: call_count.with_label_values(&["push"]),
2124            subscribe_call_count: call_count.with_label_values(&["subscribe"]),
2125            unsubscribe_call_count: call_count.with_label_values(&["unsubscribe"]),
2126        }
2127    }
2128}
2129
2130/// Metrics for the PubSubClient implementation.
2131#[derive(Debug)]
2132pub struct PubSubClientMetrics {
2133    pub sender: PubSubClientSenderMetrics,
2134    pub receiver: PubSubClientReceiverMetrics,
2135    pub grpc_connection: PubSubGrpcClientConnectionMetrics,
2136}
2137
2138impl PubSubClientMetrics {
2139    fn new(registry: &MetricsRegistry) -> Self {
2140        PubSubClientMetrics {
2141            sender: PubSubClientSenderMetrics::new(registry),
2142            receiver: PubSubClientReceiverMetrics::new(registry),
2143            grpc_connection: PubSubGrpcClientConnectionMetrics::new(registry),
2144        }
2145    }
2146}
2147
2148#[derive(Debug)]
2149pub struct PubSubGrpcClientConnectionMetrics {
2150    pub(crate) connected: UIntGauge,
2151    pub(crate) connection_established_count: IntCounter,
2152    pub(crate) connect_call_attempt_count: IntCounter,
2153    pub(crate) broadcast_recv_lagged_count: IntCounter,
2154    pub(crate) grpc_error_count: IntCounter,
2155}
2156
2157impl PubSubGrpcClientConnectionMetrics {
2158    fn new(registry: &MetricsRegistry) -> Self {
2159        Self {
2160            connected: registry.register(metric!(
2161                    name: "mz_persist_pubsub_client_grpc_connected",
2162                    help: "whether the grpc client is currently connected",
2163            )),
2164            connection_established_count: registry.register(metric!(
2165                    name: "mz_persist_pubsub_client_grpc_connection_established_count",
2166                    help: "count of grpc connection establishments to pubsub server",
2167            )),
2168            connect_call_attempt_count: registry.register(metric!(
2169                    name: "mz_persist_pubsub_client_grpc_connect_call_attempt_count",
2170                    help: "count of connection call attempts (including retries) to pubsub server",
2171            )),
2172            broadcast_recv_lagged_count: registry.register(metric!(
2173                    name: "mz_persist_pubsub_client_grpc_broadcast_recv_lagged_count",
2174                    help: "times a message was missed by broadcast receiver due to lag",
2175            )),
2176            grpc_error_count: registry.register(metric!(
2177                    name: "mz_persist_pubsub_client_grpc_error_count",
2178                    help: "count of grpc errors received",
2179            )),
2180        }
2181    }
2182}
2183
2184#[derive(Clone, Debug)]
2185pub struct PubSubClientReceiverMetrics {
2186    pub(crate) push_received: IntCounter,
2187    pub(crate) unknown_message_received: IntCounter,
2188    pub(crate) approx_diff_latency_seconds: Histogram,
2189
2190    pub(crate) state_pushed_diff_fast_path: IntCounter,
2191    pub(crate) state_pushed_diff_slow_path_succeeded: IntCounter,
2192    pub(crate) state_pushed_diff_slow_path_failed: IntCounter,
2193}
2194
2195impl PubSubClientReceiverMetrics {
2196    fn new(registry: &MetricsRegistry) -> Self {
2197        let call_received: IntCounterVec = registry.register(metric!(
2198                name: "mz_persist_pubsub_client_call_received",
2199                help: "times a pubsub client call was received",
2200                var_labels: ["call"],
2201        ));
2202
2203        Self {
2204            push_received: call_received.with_label_values(&["push"]),
2205            unknown_message_received: call_received.with_label_values(&["unknown"]),
2206            approx_diff_latency_seconds: registry.register(metric!(
2207                name: "mz_persist_pubsub_client_approx_diff_apply_latency_seconds",
2208                help: "histogram of (approximate) latency between sending a diff and applying it",
2209                buckets: prometheus::exponential_buckets(0.001, 2.0, 13).expect("buckets"),
2210            )),
2211
2212            state_pushed_diff_fast_path: registry.register(metric!(
2213                name: "mz_persist_pubsub_client_receiver_state_push_diff_fast_path",
2214                help: "count fast-path state push_diff calls",
2215            )),
2216            state_pushed_diff_slow_path_succeeded: registry.register(metric!(
2217                name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_succeeded",
2218                help: "count of successful slow-path state push_diff calls",
2219            )),
2220            state_pushed_diff_slow_path_failed: registry.register(metric!(
2221                name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_failed",
2222                help: "count of unsuccessful slow-path state push_diff calls",
2223            )),
2224        }
2225    }
2226}
2227
2228#[derive(Debug)]
2229pub struct PubSubClientSenderMetrics {
2230    pub push: PubSubClientCallMetrics,
2231    pub subscribe: PubSubClientCallMetrics,
2232    pub unsubscribe: PubSubClientCallMetrics,
2233}
2234
2235#[derive(Debug)]
2236pub struct PubSubClientCallMetrics {
2237    pub(crate) succeeded: IntCounter,
2238    pub(crate) bytes_sent: IntCounter,
2239    pub(crate) failed: IntCounter,
2240}
2241
2242impl PubSubClientSenderMetrics {
2243    fn new(registry: &MetricsRegistry) -> Self {
2244        let call_bytes_sent: IntCounterVec = registry.register(metric!(
2245                name: "mz_persist_pubsub_client_call_bytes_sent",
2246                help: "number of bytes sent for a given pubsub client call",
2247                var_labels: ["call"],
2248        ));
2249        let call_succeeded: IntCounterVec = registry.register(metric!(
2250                name: "mz_persist_pubsub_client_call_succeeded",
2251                help: "times a pubsub client call succeeded",
2252                var_labels: ["call"],
2253        ));
2254        let call_failed: IntCounterVec = registry.register(metric!(
2255                name: "mz_persist_pubsub_client_call_failed",
2256                help: "times a pubsub client call failed",
2257                var_labels: ["call"],
2258        ));
2259
2260        Self {
2261            push: PubSubClientCallMetrics {
2262                succeeded: call_succeeded.with_label_values(&["push"]),
2263                failed: call_failed.with_label_values(&["push"]),
2264                bytes_sent: call_bytes_sent.with_label_values(&["push"]),
2265            },
2266            subscribe: PubSubClientCallMetrics {
2267                succeeded: call_succeeded.with_label_values(&["subscribe"]),
2268                failed: call_failed.with_label_values(&["subscribe"]),
2269                bytes_sent: call_bytes_sent.with_label_values(&["subscribe"]),
2270            },
2271            unsubscribe: PubSubClientCallMetrics {
2272                succeeded: call_succeeded.with_label_values(&["unsubscribe"]),
2273                failed: call_failed.with_label_values(&["unsubscribe"]),
2274                bytes_sent: call_bytes_sent.with_label_values(&["unsubscribe"]),
2275            },
2276        }
2277    }
2278}
2279
2280#[derive(Debug)]
2281pub struct LocksMetrics {
2282    pub(crate) applier_read_cacheable: LockMetrics,
2283    pub(crate) applier_read_noncacheable: LockMetrics,
2284    pub(crate) applier_write: LockMetrics,
2285    pub(crate) watch: LockMetrics,
2286}
2287
2288#[derive(Debug, Clone)]
2289pub struct LockMetrics {
2290    pub(crate) acquire_count: IntCounter,
2291    pub(crate) blocking_acquire_count: IntCounter,
2292    pub(crate) blocking_seconds: Counter,
2293}
2294
2295#[derive(Debug)]
2296pub struct WatchMetrics {
2297    pub(crate) wait_woken_via_watch: IntCounter,
2298    pub(crate) wait_woken_via_sleep: IntCounter,
2299    pub(crate) wait_resolved_via_watch: IntCounter,
2300    pub(crate) wait_resolved_via_sleep: IntCounter,
2301    pub(crate) notify_sent: IntCounter,
2302    pub(crate) notify_noop: IntCounter,
2303    pub(crate) notify_recv: IntCounter,
2304    pub(crate) notify_lagged: IntCounter,
2305    pub(crate) notify_wait_started: IntCounter,
2306    pub(crate) notify_wait_finished: IntCounter,
2307}
2308
2309impl WatchMetrics {
2310    fn new(registry: &MetricsRegistry) -> Self {
2311        WatchMetrics {
2312            wait_woken_via_watch: registry.register(metric!(
2313                name: "mz_persist_wait_woken_via_watch",
2314                help: "count of wait-for-uppers wakes via watch notify",
2315            )),
2316            wait_woken_via_sleep: registry.register(metric!(
2317                name: "mz_persist_wait_woken_via_sleep",
2318                help: "count of wait-for-uppers wakes via sleep",
2319            )),
2320            wait_resolved_via_watch: registry.register(metric!(
2321                name: "mz_persist_wait_resolved_via_watch",
2322                help: "count of wait-for-uppers resolved via watch notify",
2323            )),
2324            wait_resolved_via_sleep: registry.register(metric!(
2325                name: "mz_persist_wait_resolved_via_sleep",
2326                help: "count of wait-for-uppers resolved via sleep",
2327            )),
2328            notify_sent: registry.register(metric!(
2329                name: "mz_persist_watch_notify_sent",
2330                help: "count of watch notifications sent to a non-empty broadcast channel",
2331            )),
2332            notify_noop: registry.register(metric!(
2333                name: "mz_persist_watch_notify_noop",
2334                help: "count of watch notifications sent to an broadcast channel",
2335            )),
2336            notify_recv: registry.register(metric!(
2337                name: "mz_persist_watch_notify_recv",
2338                help: "count of watch notifications received from the broadcast channel",
2339            )),
2340            notify_lagged: registry.register(metric!(
2341                name: "mz_persist_watch_notify_lagged",
2342                help: "count of lagged events in the watch notification broadcast channel",
2343            )),
2344            notify_wait_started: registry.register(metric!(
2345                name: "mz_persist_watch_notify_wait_started",
2346                help: "count of watch wait calls started",
2347            )),
2348            notify_wait_finished: registry.register(metric!(
2349                name: "mz_persist_watch_notify_wait_finished",
2350                help: "count of watch wait calls resolved",
2351            )),
2352        }
2353    }
2354}
2355
2356#[derive(Debug)]
2357pub struct PushdownMetrics {
2358    pub(crate) parts_filtered_count: IntCounter,
2359    pub(crate) parts_filtered_bytes: IntCounter,
2360    pub(crate) parts_fetched_count: IntCounter,
2361    pub(crate) parts_fetched_bytes: IntCounter,
2362    pub(crate) parts_audited_count: IntCounter,
2363    pub(crate) parts_audited_bytes: IntCounter,
2364    pub(crate) parts_inline_count: IntCounter,
2365    pub(crate) parts_inline_bytes: IntCounter,
2366    pub(crate) parts_faked_count: IntCounter,
2367    pub(crate) parts_faked_bytes: IntCounter,
2368    pub(crate) parts_stats_trimmed_count: IntCounter,
2369    pub(crate) parts_stats_trimmed_bytes: IntCounter,
2370    pub(crate) parts_projection_trimmed_bytes: IntCounter,
2371    pub part_stats: PartStatsMetrics,
2372}
2373
2374impl PushdownMetrics {
2375    fn new(registry: &MetricsRegistry) -> Self {
2376        PushdownMetrics {
2377            parts_filtered_count: registry.register(metric!(
2378                name: "mz_persist_pushdown_parts_filtered_count",
2379                help: "count of parts filtered by pushdown",
2380            )),
2381            parts_filtered_bytes: registry.register(metric!(
2382                name: "mz_persist_pushdown_parts_filtered_bytes",
2383                help: "total size of parts filtered by pushdown in bytes",
2384            )),
2385            parts_fetched_count: registry.register(metric!(
2386                name: "mz_persist_pushdown_parts_fetched_count",
2387                help: "count of parts not filtered by pushdown",
2388            )),
2389            parts_fetched_bytes: registry.register(metric!(
2390                name: "mz_persist_pushdown_parts_fetched_bytes",
2391                help: "total size of parts not filtered by pushdown in bytes",
2392            )),
2393            parts_audited_count: registry.register(metric!(
2394                name: "mz_persist_pushdown_parts_audited_count",
2395                help: "count of parts fetched only for pushdown audit",
2396            )),
2397            parts_audited_bytes: registry.register(metric!(
2398                name: "mz_persist_pushdown_parts_audited_bytes",
2399                help: "total size of parts fetched only for pushdown audit",
2400            )),
2401            parts_inline_count: registry.register(metric!(
2402                name: "mz_persist_pushdown_parts_inline_count",
2403                help: "count of parts not fetched because they were inline",
2404            )),
2405            parts_inline_bytes: registry.register(metric!(
2406                name: "mz_persist_pushdown_parts_inline_bytes",
2407                help: "total size of parts not fetched because they were inline",
2408            )),
2409            parts_faked_count: registry.register(metric!(
2410                name: "mz_persist_pushdown_parts_faked_count",
2411                help: "count of parts faked because of aggressive projection pushdown",
2412            )),
2413            parts_faked_bytes: registry.register(metric!(
2414                name: "mz_persist_pushdown_parts_faked_bytes",
2415                help: "total size of parts replaced with fakes by aggressive projection pushdown",
2416            )),
2417            parts_stats_trimmed_count: registry.register(metric!(
2418                name: "mz_persist_pushdown_parts_stats_trimmed_count",
2419                help: "count of trimmed part stats",
2420            )),
2421            parts_stats_trimmed_bytes: registry.register(metric!(
2422                name: "mz_persist_pushdown_parts_stats_trimmed_bytes",
2423                help: "total bytes trimmed from part stats",
2424            )),
2425            parts_projection_trimmed_bytes: registry.register(metric!(
2426                name: "mz_persist_pushdown_parts_projection_trimmed_bytes",
2427                help: "total bytes trimmed from columnar data because of projection pushdown",
2428            )),
2429            part_stats: PartStatsMetrics::new(registry),
2430        }
2431    }
2432}
2433
2434#[derive(Debug)]
2435pub struct ConsolidationMetrics {
2436    pub(crate) parts_fetched: IntCounter,
2437    pub(crate) parts_skipped: IntCounter,
2438    pub(crate) parts_wasted: IntCounter,
2439    pub(crate) wrong_sort: IntCounter,
2440}
2441
2442impl ConsolidationMetrics {
2443    fn new(registry: &MetricsRegistry) -> Self {
2444        ConsolidationMetrics {
2445            parts_fetched: registry.register(metric!(
2446                name: "mz_persist_consolidation_parts_fetched_count",
2447                help: "count of parts that were fetched and used during consolidation",
2448            )),
2449            parts_skipped: registry.register(metric!(
2450                name: "mz_persist_consolidation_parts_skipped_count",
2451                help: "count of parts that were never needed during consolidation",
2452            )),
2453            parts_wasted: registry.register(metric!(
2454                name: "mz_persist_consolidation_parts_wasted_count",
2455                help: "count of parts that were fetched but not needed during consolidation",
2456            )),
2457            wrong_sort: registry.register(metric!(
2458                name: "mz_persist_consolidation_wrong_sort_count",
2459                help: "count of runs that were sorted using the wrong ordering for the current consolidation",
2460            )),
2461        }
2462    }
2463}
2464
2465#[derive(Debug)]
2466pub struct BlobMemCache {
2467    pub(crate) size_blobs: UIntGauge,
2468    pub(crate) size_bytes: UIntGauge,
2469    pub(crate) hits_blobs: IntCounter,
2470    pub(crate) hits_bytes: IntCounter,
2471    pub(crate) evictions: IntCounter,
2472}
2473
2474impl BlobMemCache {
2475    fn new(registry: &MetricsRegistry) -> Self {
2476        BlobMemCache {
2477            size_blobs: registry.register(metric!(
2478                name: "mz_persist_blob_cache_size_blobs",
2479                help: "count of blobs in the cache",
2480                const_labels: {"cache" => "mem"},
2481            )),
2482            size_bytes: registry.register(metric!(
2483                name: "mz_persist_blob_cache_size_bytes",
2484                help: "total size of blobs in the cache",
2485                const_labels: {"cache" => "mem"},
2486            )),
2487            hits_blobs: registry.register(metric!(
2488                name: "mz_persist_blob_cache_hits_blobs",
2489                help: "count of blobs served via cache instead of s3",
2490                const_labels: {"cache" => "mem"},
2491            )),
2492            hits_bytes: registry.register(metric!(
2493                name: "mz_persist_blob_cache_hits_bytes",
2494                help: "total size of blobs served via cache instead of s3",
2495                const_labels: {"cache" => "mem"},
2496            )),
2497            evictions: registry.register(metric!(
2498                name: "mz_persist_blob_cache_evictions",
2499                help: "count of capacity-based cache evictions",
2500                const_labels: {"cache" => "mem"},
2501            )),
2502        }
2503    }
2504}
2505
2506#[derive(Debug)]
2507pub struct SemaphoreMetrics {
2508    cfg: PersistConfig,
2509    registry: MetricsRegistry,
2510    fetch: OnceCell<MetricsSemaphore>,
2511}
2512
2513impl SemaphoreMetrics {
2514    fn new(cfg: PersistConfig, registry: MetricsRegistry) -> Self {
2515        SemaphoreMetrics {
2516            cfg,
2517            registry,
2518            fetch: OnceCell::new(),
2519        }
2520    }
2521
2522    /// We can't easily change the number of permits, and the dyncfgs are all
2523    /// set to defaults on process start, so make sure we only initialize the
2524    /// semaphore once we've synced dyncfgs at least once.
2525    async fn fetch(&self) -> &MetricsSemaphore {
2526        if let Some(x) = self.fetch.get() {
2527            // Common case of already initialized avoids the cloning below.
2528            return x;
2529        }
2530        let cfg = self.cfg.clone();
2531        let registry = self.registry.clone();
2532        let init = async move {
2533            let total_permits = match cfg.announce_memory_limit {
2534                // Non-cc replicas have the old physical flow control mechanism,
2535                // so only apply this one on cc replicas.
2536                Some(mem) if cfg.is_cc_active => {
2537                    // We can't easily adjust the number of permits later, so
2538                    // make sure we've synced dyncfg values at least once.
2539                    info!("fetch semaphore awaiting first dyncfg values");
2540                    let () = cfg.configs_synced_once().await;
2541                    let total_permits = usize::cast_lossy(
2542                        f64::cast_lossy(mem) * FETCH_SEMAPHORE_PERMIT_ADJUSTMENT.get(&cfg),
2543                    );
2544                    info!("fetch_semaphore got first dyncfg values");
2545                    total_permits
2546                }
2547                Some(_) | None => Semaphore::MAX_PERMITS,
2548            };
2549            MetricsSemaphore::new(&registry, "fetch", total_permits)
2550        };
2551        self.fetch.get_or_init(|| init).await
2552    }
2553
2554    pub(crate) async fn acquire_fetch_permits(&self, encoded_size_bytes: usize) -> MetricsPermits {
2555        // Adjust the requested permits to account for the difference between
2556        // encoded_size_bytes and the decoded size in lgalloc.
2557        let requested_permits = f64::cast_lossy(encoded_size_bytes);
2558        let requested_permits = requested_permits * FETCH_SEMAPHORE_COST_ADJUSTMENT.get(&self.cfg);
2559        let requested_permits = usize::cast_lossy(requested_permits);
2560        self.fetch().await.acquire_permits(requested_permits).await
2561    }
2562}
2563
2564#[derive(Debug)]
2565pub struct MetricsSemaphore {
2566    name: &'static str,
2567    semaphore: Arc<Semaphore>,
2568    total_permits: usize,
2569    acquire_count: IntCounter,
2570    blocking_count: IntCounter,
2571    blocking_seconds: Counter,
2572    acquired_permits: IntCounter,
2573    released_permits: IntCounter,
2574    _available_permits: ComputedUIntGauge,
2575}
2576
2577impl MetricsSemaphore {
2578    pub fn new(registry: &MetricsRegistry, name: &'static str, total_permits: usize) -> Self {
2579        let total_permits = std::cmp::min(total_permits, Semaphore::MAX_PERMITS);
2580        // TODO: Sadly, tokio::sync::Semaphore makes it difficult to have a
2581        // dynamic total_permits count.
2582        let semaphore = Arc::new(Semaphore::new(total_permits));
2583        MetricsSemaphore {
2584            name,
2585            total_permits,
2586            acquire_count: registry.register(metric!(
2587                name: "mz_persist_semaphore_acquire_count",
2588                help: "count of acquire calls (not acquired permits count)",
2589                const_labels: {"name" => name},
2590            )),
2591            blocking_count: registry.register(metric!(
2592                name: "mz_persist_semaphore_blocking_count",
2593                help: "count of acquire calls that had to block",
2594                const_labels: {"name" => name},
2595            )),
2596            blocking_seconds: registry.register(metric!(
2597                name: "mz_persist_semaphore_blocking_seconds",
2598                help: "total time spent blocking on permit acquisition",
2599                const_labels: {"name" => name},
2600            )),
2601            acquired_permits: registry.register(metric!(
2602                name: "mz_persist_semaphore_acquired_permits",
2603                help: "total sum of acquired permits",
2604                const_labels: {"name" => name},
2605            )),
2606            released_permits: registry.register(metric!(
2607                name: "mz_persist_semaphore_released_permits",
2608                help: "total sum of released permits",
2609                const_labels: {"name" => name},
2610            )),
2611            _available_permits: registry.register_computed_gauge(
2612                metric!(
2613                    name: "mz_persist_semaphore_available_permits",
2614                    help: "currently available permits according to the semaphore",
2615                ),
2616                {
2617                    let semaphore = Arc::clone(&semaphore);
2618                    move || u64::cast_from(semaphore.available_permits())
2619                },
2620            ),
2621            semaphore,
2622        }
2623    }
2624
2625    pub async fn acquire_permits(&self, requested_permits: usize) -> MetricsPermits {
2626        // HACK: Cap the request at the total permit count. This prevents
2627        // deadlock, even if the cfg gets set to some small value.
2628        let total_permits = u32::try_from(self.total_permits).unwrap_or(u32::MAX);
2629        let requested_permits = u32::try_from(requested_permits).unwrap_or(u32::MAX);
2630        let requested_permits = std::cmp::min(requested_permits, total_permits);
2631        let wrap = |_permit| {
2632            self.acquired_permits.inc_by(u64::from(requested_permits));
2633            MetricsPermits {
2634                _permit,
2635                released_metric: self.released_permits.clone(),
2636                count: requested_permits,
2637            }
2638        };
2639
2640        // Special-case non-blocking happy path.
2641        self.acquire_count.inc();
2642        match Arc::clone(&self.semaphore).try_acquire_many_owned(requested_permits) {
2643            Ok(x) => return wrap(x),
2644            Err(_) => {}
2645        };
2646
2647        // Sad path, gotta block.
2648        self.blocking_count.inc();
2649        let start = Instant::now();
2650        let ret = Arc::clone(&self.semaphore)
2651            .acquire_many_owned(requested_permits)
2652            .instrument(info_span!("acquire_permits"))
2653            .await;
2654        let elapsed = start.elapsed();
2655        self.blocking_seconds.inc_by(elapsed.as_secs_f64());
2656        debug!(
2657            "acquisition of {} {} permits blocked for {:?}",
2658            self.name, requested_permits, elapsed
2659        );
2660        wrap(ret.expect("semaphore is never closed"))
2661    }
2662}
2663
2664#[derive(Debug)]
2665pub struct MetricsPermits {
2666    _permit: OwnedSemaphorePermit,
2667    released_metric: IntCounter,
2668    count: u32,
2669}
2670
2671impl Drop for MetricsPermits {
2672    fn drop(&mut self) {
2673        self.released_metric.inc_by(u64::from(self.count))
2674    }
2675}
2676
2677#[derive(Debug)]
2678pub struct ExternalOpMetrics {
2679    started: IntCounter,
2680    succeeded: IntCounter,
2681    failed: IntCounter,
2682    bytes: IntCounter,
2683    seconds: Counter,
2684    seconds_histogram: Option<Histogram>,
2685    alerts_metrics: Arc<AlertsMetrics>,
2686}
2687
2688impl ExternalOpMetrics {
2689    async fn run_op<R, F, OpFn, ErrFn>(
2690        &self,
2691        op_fn: OpFn,
2692        on_err_fn: ErrFn,
2693    ) -> Result<R, ExternalError>
2694    where
2695        F: std::future::Future<Output = Result<R, ExternalError>>,
2696        OpFn: FnOnce() -> F,
2697        ErrFn: FnOnce(&AlertsMetrics, &ExternalError),
2698    {
2699        self.started.inc();
2700        let start = Instant::now();
2701        let res = op_fn().await;
2702        let elapsed_seconds = start.elapsed().as_secs_f64();
2703        self.seconds.inc_by(elapsed_seconds);
2704        if let Some(h) = &self.seconds_histogram {
2705            h.observe(elapsed_seconds);
2706        }
2707        match res.as_ref() {
2708            Ok(_) => self.succeeded.inc(),
2709            Err(err) => {
2710                self.failed.inc();
2711                on_err_fn(&self.alerts_metrics, err);
2712            }
2713        };
2714        res
2715    }
2716
2717    fn run_stream<'a, R: 'a, S, OpFn, ErrFn>(
2718        &'a self,
2719        op_fn: OpFn,
2720        mut on_err_fn: ErrFn,
2721    ) -> impl futures::Stream<Item = Result<R, ExternalError>> + 'a
2722    where
2723        S: futures::Stream<Item = Result<R, ExternalError>> + Unpin + 'a,
2724        OpFn: FnOnce() -> S,
2725        ErrFn: FnMut(&AlertsMetrics, &ExternalError) + 'a,
2726    {
2727        self.started.inc();
2728        let start = Instant::now();
2729        let mut stream = op_fn();
2730        stream! {
2731            let mut succeeded = true;
2732            while let Some(res) = stream.next().await {
2733                if let Err(err) = res.as_ref() {
2734                    on_err_fn(&self.alerts_metrics, err);
2735                    succeeded = false;
2736                }
2737                yield res;
2738            }
2739            if succeeded {
2740                self.succeeded.inc()
2741            } else {
2742                self.failed.inc()
2743            }
2744            let elapsed_seconds = start.elapsed().as_secs_f64();
2745            self.seconds.inc_by(elapsed_seconds);
2746            if let Some(h) = &self.seconds_histogram {
2747                h.observe(elapsed_seconds);
2748            }
2749        }
2750    }
2751}
2752
2753#[derive(Debug)]
2754pub struct BlobMetrics {
2755    set: ExternalOpMetrics,
2756    get: ExternalOpMetrics,
2757    list_keys: ExternalOpMetrics,
2758    delete: ExternalOpMetrics,
2759    restore: ExternalOpMetrics,
2760    delete_noop: IntCounter,
2761    blob_sizes: Histogram,
2762    pub rtt_latency: Gauge,
2763}
2764
2765#[derive(Debug)]
2766pub struct MetricsBlob {
2767    blob: Arc<dyn Blob>,
2768    metrics: Arc<Metrics>,
2769}
2770
2771impl MetricsBlob {
2772    pub fn new(blob: Arc<dyn Blob>, metrics: Arc<Metrics>) -> Self {
2773        MetricsBlob { blob, metrics }
2774    }
2775
2776    fn on_err(alerts_metrics: &AlertsMetrics, _err: &ExternalError) {
2777        alerts_metrics.blob_failures.inc()
2778    }
2779}
2780
2781#[async_trait]
2782impl Blob for MetricsBlob {
2783    #[instrument(name = "blob::get", fields(shard=blob_key_shard_id(key)))]
2784    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
2785        let res = self
2786            .metrics
2787            .blob
2788            .get
2789            .run_op(|| self.blob.get(key), Self::on_err)
2790            .await;
2791        if let Ok(Some(value)) = res.as_ref() {
2792            self.metrics
2793                .blob
2794                .get
2795                .bytes
2796                .inc_by(u64::cast_from(value.len()));
2797        }
2798        res
2799    }
2800
2801    #[instrument(name = "blob::list_keys_and_metadata", fields(shard=blob_key_shard_id(key_prefix)))]
2802    async fn list_keys_and_metadata(
2803        &self,
2804        key_prefix: &str,
2805        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
2806    ) -> Result<(), ExternalError> {
2807        let mut byte_total = 0;
2808        let mut instrumented = |blob_metadata: BlobMetadata| {
2809            // Track the size of the _keys_, not the blobs, so that we get a
2810            // sense for how much network bandwidth these calls are using.
2811            byte_total += blob_metadata.key.len();
2812            f(blob_metadata)
2813        };
2814
2815        let res = self
2816            .metrics
2817            .blob
2818            .list_keys
2819            .run_op(
2820                || {
2821                    self.blob
2822                        .list_keys_and_metadata(key_prefix, &mut instrumented)
2823                },
2824                Self::on_err,
2825            )
2826            .await;
2827
2828        self.metrics
2829            .blob
2830            .list_keys
2831            .bytes
2832            .inc_by(u64::cast_from(byte_total));
2833
2834        res
2835    }
2836
2837    #[instrument(name = "blob::set", fields(shard=blob_key_shard_id(key),size_bytes=value.len()))]
2838    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
2839        let bytes = value.len();
2840        let res = self
2841            .metrics
2842            .blob
2843            .set
2844            .run_op(|| self.blob.set(key, value), Self::on_err)
2845            .await;
2846        if res.is_ok() {
2847            self.metrics.blob.set.bytes.inc_by(u64::cast_from(bytes));
2848            self.metrics.blob.blob_sizes.observe(f64::cast_lossy(bytes));
2849        }
2850        res
2851    }
2852
2853    #[instrument(name = "blob::delete", fields(shard=blob_key_shard_id(key)))]
2854    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
2855        let bytes = self
2856            .metrics
2857            .blob
2858            .delete
2859            .run_op(|| self.blob.delete(key), Self::on_err)
2860            .await?;
2861        if let Some(bytes) = bytes {
2862            self.metrics.blob.delete.bytes.inc_by(u64::cast_from(bytes));
2863        } else {
2864            self.metrics.blob.delete_noop.inc();
2865        }
2866        Ok(bytes)
2867    }
2868
2869    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
2870        self.metrics
2871            .blob
2872            .restore
2873            .run_op(|| self.blob.restore(key), Self::on_err)
2874            .await
2875    }
2876}
2877
2878#[derive(Debug)]
2879pub struct ConsensusMetrics {
2880    list_keys: ExternalOpMetrics,
2881    head: ExternalOpMetrics,
2882    compare_and_set: ExternalOpMetrics,
2883    scan: ExternalOpMetrics,
2884    truncate: ExternalOpMetrics,
2885    truncated_count: IntCounter,
2886    pub rtt_latency: Gauge,
2887}
2888
2889#[derive(Debug)]
2890pub struct MetricsConsensus {
2891    consensus: Arc<dyn Consensus>,
2892    metrics: Arc<Metrics>,
2893}
2894
2895impl MetricsConsensus {
2896    pub fn new(consensus: Arc<dyn Consensus>, metrics: Arc<Metrics>) -> Self {
2897        MetricsConsensus { consensus, metrics }
2898    }
2899
2900    fn on_err(alerts_metrics: &AlertsMetrics, err: &ExternalError) {
2901        // As of 2022-09-06, regular determinate errors are expected in
2902        // Consensus (i.e. "txn conflict, please retry"), so only count the
2903        // indeterminate ones.
2904        if let ExternalError::Indeterminate(_) = err {
2905            alerts_metrics.consensus_failures.inc()
2906        }
2907    }
2908}
2909
2910#[async_trait]
2911impl Consensus for MetricsConsensus {
2912    fn list_keys(&self) -> ResultStream<'_, String> {
2913        Box::pin(
2914            self.metrics
2915                .consensus
2916                .list_keys
2917                .run_stream(|| self.consensus.list_keys(), Self::on_err),
2918        )
2919    }
2920
2921    #[instrument(name = "consensus::head", fields(shard=key))]
2922    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
2923        let res = self
2924            .metrics
2925            .consensus
2926            .head
2927            .run_op(|| self.consensus.head(key), Self::on_err)
2928            .await;
2929        if let Ok(Some(data)) = res.as_ref() {
2930            self.metrics
2931                .consensus
2932                .head
2933                .bytes
2934                .inc_by(u64::cast_from(data.data.len()));
2935        }
2936        res
2937    }
2938
2939    #[instrument(name = "consensus::compare_and_set", fields(shard=key,size_bytes=new.data.len()))]
2940    async fn compare_and_set(
2941        &self,
2942        key: &str,
2943        new: VersionedData,
2944    ) -> Result<CaSResult, ExternalError> {
2945        let bytes = new.data.len();
2946        let res = self
2947            .metrics
2948            .consensus
2949            .compare_and_set
2950            .run_op(|| self.consensus.compare_and_set(key, new), Self::on_err)
2951            .await;
2952        match res.as_ref() {
2953            Ok(CaSResult::Committed) => self
2954                .metrics
2955                .consensus
2956                .compare_and_set
2957                .bytes
2958                .inc_by(u64::cast_from(bytes)),
2959            Ok(CaSResult::ExpectationMismatch) | Err(_) => {}
2960        }
2961        res
2962    }
2963
2964    #[instrument(name = "consensus::scan", fields(shard=key))]
2965    async fn scan(
2966        &self,
2967        key: &str,
2968        from: SeqNo,
2969        limit: usize,
2970    ) -> Result<Vec<VersionedData>, ExternalError> {
2971        let res = self
2972            .metrics
2973            .consensus
2974            .scan
2975            .run_op(|| self.consensus.scan(key, from, limit), Self::on_err)
2976            .await;
2977        if let Ok(dataz) = res.as_ref() {
2978            let bytes: usize = dataz.iter().map(|x| x.data.len()).sum();
2979            self.metrics
2980                .consensus
2981                .scan
2982                .bytes
2983                .inc_by(u64::cast_from(bytes));
2984        }
2985        res
2986    }
2987
2988    #[instrument(name = "consensus::truncate", fields(shard=key))]
2989    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
2990        let metrics = &self.metrics.consensus;
2991        let deleted = metrics
2992            .truncate
2993            .run_op(|| self.consensus.truncate(key, seqno), Self::on_err)
2994            .await?;
2995        if let Some(deleted) = deleted {
2996            metrics.truncated_count.inc_by(u64::cast_from(deleted));
2997        }
2998        Ok(deleted)
2999    }
3000}
3001
3002/// A standard set of metrics for an async task. Call [TaskMetrics::instrument_task] to instrument
3003/// a future and report its metrics for this task type.
3004#[derive(Debug, Clone)]
3005pub struct TaskMetrics {
3006    f64_gauges: Vec<(Gauge, fn(&tokio_metrics::TaskMetrics) -> f64)>,
3007    u64_gauges: Vec<(
3008        GenericGauge<AtomicU64>,
3009        fn(&tokio_metrics::TaskMetrics) -> u64,
3010    )>,
3011    monitor: TaskMonitor,
3012}
3013
3014impl TaskMetrics {
3015    pub fn new(name: &str) -> Self {
3016        let monitor = TaskMonitor::new();
3017        Self {
3018            f64_gauges: vec![
3019                (
3020                    Gauge::make_collector(metric!(
3021                        name: "mz_persist_task_total_idle_duration",
3022                        help: "Seconds of time spent idling, ie. waiting for a task to be woken up.",
3023                        const_labels: {"name" => name}
3024                    )),
3025                    |m| m.total_idle_duration.as_secs_f64(),
3026                ),
3027                (
3028                    Gauge::make_collector(metric!(
3029                        name: "mz_persist_task_total_scheduled_duration",
3030                        help: "Seconds of time spent scheduled, ie. ready to poll but not yet polled.",
3031                        const_labels: {"name" => name}
3032                    )),
3033                    |m| m.total_scheduled_duration.as_secs_f64(),
3034                ),
3035            ],
3036            u64_gauges: vec![
3037                (
3038                    MakeCollector::make_collector(metric!(
3039                        name: "mz_persist_task_total_scheduled_count",
3040                        help: "The total number of task schedules. Useful for computing the average scheduled time.",
3041                        const_labels: {"name" => name}
3042                    )),
3043                    |m| m.total_scheduled_count,
3044                ),
3045                (
3046                    MakeCollector::make_collector(metric!(
3047                        name: "mz_persist_task_total_idled_count",
3048                        help: "The total number of task idles. Useful for computing the average idle time.",
3049                        const_labels: {"name" => name}
3050                    ,
3051                    )),
3052                    |m| m.total_idled_count,
3053                ),
3054            ],
3055            monitor,
3056        }
3057    }
3058
3059    /// Instrument the provided future. The expectation is that the result will be executed
3060    /// as a task. (See [TaskMonitor::instrument] for more context.)
3061    pub fn instrument_task<F>(&self, task: F) -> tokio_metrics::Instrumented<F> {
3062        TaskMonitor::instrument(&self.monitor, task)
3063    }
3064}
3065
3066impl Collector for TaskMetrics {
3067    fn desc(&self) -> Vec<&Desc> {
3068        let mut descs = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
3069        for (g, _) in &self.f64_gauges {
3070            descs.extend(g.desc());
3071        }
3072        for (g, _) in &self.u64_gauges {
3073            descs.extend(g.desc());
3074        }
3075        descs
3076    }
3077
3078    fn collect(&self) -> Vec<MetricFamily> {
3079        let mut families = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
3080        let metrics = self.monitor.cumulative();
3081        for (g, metrics_fn) in &self.f64_gauges {
3082            g.set(metrics_fn(&metrics));
3083            families.extend(g.collect());
3084        }
3085        for (g, metrics_fn) in &self.u64_gauges {
3086            g.set(metrics_fn(&metrics));
3087            families.extend(g.collect());
3088        }
3089        families
3090    }
3091}
3092
3093#[derive(Debug)]
3094pub struct TasksMetrics {
3095    pub heartbeat_read: TaskMetrics,
3096}
3097
3098impl TasksMetrics {
3099    fn new(registry: &MetricsRegistry) -> Self {
3100        let heartbeat_read = TaskMetrics::new("heartbeat_read");
3101        registry.register_collector(heartbeat_read.clone());
3102        TasksMetrics { heartbeat_read }
3103    }
3104}
3105
3106#[derive(Debug)]
3107pub struct SchemaMetrics {
3108    pub(crate) cache_fetch_state_count: IntCounter,
3109    pub(crate) cache_schema: SchemaCacheMetrics,
3110    pub(crate) cache_migration: SchemaCacheMetrics,
3111    pub(crate) migration_count_same: IntCounter,
3112    pub(crate) migration_count_codec: IntCounter,
3113    pub(crate) migration_count_either: IntCounter,
3114    pub(crate) migration_len_legacy_codec: IntCounter,
3115    pub(crate) migration_len_either_codec: IntCounter,
3116    pub(crate) migration_len_either_arrow: IntCounter,
3117    pub(crate) migration_new_count: IntCounter,
3118    pub(crate) migration_new_seconds: Counter,
3119    pub(crate) migration_migrate_seconds: Counter,
3120}
3121
3122impl SchemaMetrics {
3123    fn new(registry: &MetricsRegistry) -> Self {
3124        let cached: IntCounterVec = registry.register(metric!(
3125            name: "mz_persist_schema_cache_cached_count",
3126            help: "count of schema cache entries served from cache",
3127            var_labels: ["op"],
3128        ));
3129        let computed: IntCounterVec = registry.register(metric!(
3130            name: "mz_persist_schema_cache_computed_count",
3131            help: "count of schema cache entries computed",
3132            var_labels: ["op"],
3133        ));
3134        let unavailable: IntCounterVec = registry.register(metric!(
3135            name: "mz_persist_schema_cache_unavailable_count",
3136            help: "count of schema cache entries unavailable at current state",
3137            var_labels: ["op"],
3138        ));
3139        let added: IntCounterVec = registry.register(metric!(
3140            name: "mz_persist_schema_cache_added_count",
3141            help: "count of schema cache entries added",
3142            var_labels: ["op"],
3143        ));
3144        let dropped: IntCounterVec = registry.register(metric!(
3145            name: "mz_persist_schema_cache_dropped_count",
3146            help: "count of schema cache entries dropped",
3147            var_labels: ["op"],
3148        ));
3149        let cache = |name| SchemaCacheMetrics {
3150            cached_count: cached.with_label_values(&[name]),
3151            computed_count: computed.with_label_values(&[name]),
3152            unavailable_count: unavailable.with_label_values(&[name]),
3153            added_count: added.with_label_values(&[name]),
3154            dropped_count: dropped.with_label_values(&[name]),
3155        };
3156        let migration_count: IntCounterVec = registry.register(metric!(
3157            name: "mz_persist_schema_migration_count",
3158            help: "count of fetch part migrations",
3159            var_labels: ["op"],
3160        ));
3161        let migration_len: IntCounterVec = registry.register(metric!(
3162            name: "mz_persist_schema_migration_len",
3163            help: "count of migrated update records",
3164            var_labels: ["op"],
3165        ));
3166        SchemaMetrics {
3167            cache_fetch_state_count: registry.register(metric!(
3168                name: "mz_persist_schema_cache_fetch_state_count",
3169                help: "count of state fetches by the schema cache",
3170            )),
3171            cache_schema: cache("schema"),
3172            cache_migration: cache("migration"),
3173            migration_count_same: migration_count.with_label_values(&["same"]),
3174            migration_count_codec: migration_count.with_label_values(&["codec"]),
3175            migration_count_either: migration_count.with_label_values(&["either"]),
3176            migration_len_legacy_codec: migration_len.with_label_values(&["legacy_codec"]),
3177            migration_len_either_codec: migration_len.with_label_values(&["either_codec"]),
3178            migration_len_either_arrow: migration_len.with_label_values(&["either_arrow"]),
3179            migration_new_count: registry.register(metric!(
3180                name: "mz_persist_schema_migration_new_count",
3181                help: "count of migrations constructed",
3182            )),
3183            migration_new_seconds: registry.register(metric!(
3184                name: "mz_persist_schema_migration_new_seconds",
3185                help: "seconds spent constructing migration logic",
3186            )),
3187            migration_migrate_seconds: registry.register(metric!(
3188                name: "mz_persist_schema_migration_migrate_seconds",
3189                help: "seconds spent applying migration logic",
3190            )),
3191        }
3192    }
3193}
3194
3195#[derive(Debug, Clone)]
3196pub struct SchemaCacheMetrics {
3197    pub(crate) cached_count: IntCounter,
3198    pub(crate) computed_count: IntCounter,
3199    pub(crate) unavailable_count: IntCounter,
3200    pub(crate) added_count: IntCounter,
3201    pub(crate) dropped_count: IntCounter,
3202}
3203
3204#[derive(Debug)]
3205pub struct InlineMetrics {
3206    pub(crate) part_commit_count: IntCounter,
3207    pub(crate) part_commit_bytes: IntCounter,
3208    pub(crate) backpressure: BatchWriteMetrics,
3209}
3210
3211impl InlineMetrics {
3212    fn new(registry: &MetricsRegistry) -> Self {
3213        InlineMetrics {
3214            part_commit_count: registry.register(metric!(
3215                name: "mz_persist_inline_part_commit_count",
3216                help: "count of inline parts committed to state",
3217            )),
3218            part_commit_bytes: registry.register(metric!(
3219                name: "mz_persist_inline_part_commit_bytes",
3220                help: "total size of of inline parts committed to state",
3221            )),
3222            backpressure: BatchWriteMetrics::new(registry, "inline_backpressure"),
3223        }
3224    }
3225}
3226
3227fn blob_key_shard_id(key: &str) -> Option<String> {
3228    let (shard_id, _) = BlobKey::parse_ids(key).ok()?;
3229    Some(shard_id.to_string())
3230}
3231
3232/// Encode a frontier into an i64 acceptable for use in metrics.
3233pub fn encode_ts_metric<T: Codec64>(ts: &Antichain<T>) -> i64 {
3234    // We have two problems in mapping a persist frontier into a metric.
3235    // First is that we only have a `T: Timestamp+Codec64`. Second, is
3236    // mapping an antichain to a single counter value. We solve both by
3237    // taking advantage of the fact that in practice, timestamps in mz are
3238    // currently always a u64 (and if we switch them, it will be to an i64).
3239    // This means that for all values that mz would actually produce,
3240    // interpreting the encoded bytes as a little-endian i64 will work.
3241    // Both of them impl PartialOrder, so in practice, there will always be
3242    // zero or one elements in the antichain.
3243    match ts.elements().first() {
3244        Some(ts) => i64::from_le_bytes(Codec64::encode(ts)),
3245        None => i64::MAX,
3246    }
3247}