mz_persist_client/internal/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Prometheus monitoring metrics.
11
12use async_stream::stream;
13use mz_persist_types::stats::PartStatsMetrics;
14use std::collections::BTreeMap;
15use std::sync::{Arc, Mutex, Weak};
16use std::time::{Duration, Instant};
17use tokio::sync::{OnceCell, OwnedSemaphorePermit, Semaphore};
18
19use async_trait::async_trait;
20use bytes::Bytes;
21use futures_util::StreamExt;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::{CastFrom, CastLossy};
24use mz_ore::instrument;
25use mz_ore::metric;
26use mz_ore::metrics::{
27    ComputedGauge, ComputedIntGauge, ComputedUIntGauge, Counter, DeleteOnDropCounter,
28    DeleteOnDropGauge, IntCounter, MakeCollector, MetricVecExt, MetricsRegistry, UIntGauge,
29    UIntGaugeVec, raw,
30};
31use mz_ore::stats::histogram_seconds_buckets;
32use mz_persist::location::{
33    Blob, BlobMetadata, CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData,
34};
35use mz_persist::metrics::{ColumnarMetrics, S3BlobMetrics};
36use mz_persist::retry::RetryStream;
37use mz_persist_types::Codec64;
38use mz_postgres_client::metrics::PostgresClientMetrics;
39use prometheus::core::{AtomicI64, AtomicU64, Collector, Desc, GenericGauge};
40use prometheus::proto::MetricFamily;
41use prometheus::{CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounterVec};
42use timely::progress::Antichain;
43use tokio_metrics::TaskMonitor;
44use tracing::{Instrument, debug, info, info_span};
45
46use crate::fetch::{FETCH_SEMAPHORE_COST_ADJUSTMENT, FETCH_SEMAPHORE_PERMIT_ADJUSTMENT};
47use crate::internal::paths::BlobKey;
48use crate::{PersistConfig, ShardId};
49
50/// Prometheus monitoring metrics.
51///
52/// Intentionally not Clone because we expect this to be passed around in an
53/// Arc.
54pub struct Metrics {
55    _vecs: MetricsVecs,
56    _uptime: ComputedGauge,
57
58    /// Metrics for [Blob] usage.
59    pub blob: BlobMetrics,
60    /// Metrics for [Consensus] usage.
61    pub consensus: ConsensusMetrics,
62    /// Metrics of command evaluation.
63    pub cmds: CmdsMetrics,
64    /// Metrics for each retry loop.
65    pub retries: RetriesMetrics,
66    /// Metrics for batches written directly on behalf of a user (BatchBuilder
67    /// or one of the sugar methods that use it).
68    pub user: BatchWriteMetrics,
69    /// Metrics for reading batch parts
70    pub read: BatchPartReadMetrics,
71    /// Metrics for compaction.
72    pub compaction: CompactionMetrics,
73    /// Metrics for garbage collection.
74    pub gc: GcMetrics,
75    /// Metrics for leasing and automatic lease expiry.
76    pub lease: LeaseMetrics,
77    /// Metrics for various encodings and decodings.
78    pub codecs: CodecsMetrics,
79    /// Metrics for (incremental) state updates and fetches.
80    pub state: StateMetrics,
81    /// Metrics for various per-shard measurements.
82    pub shards: ShardsMetrics,
83    /// Metrics for auditing persist usage
84    pub audit: UsageAuditMetrics,
85    /// Metrics for locking.
86    pub locks: LocksMetrics,
87    /// Metrics for StateWatch.
88    pub watch: WatchMetrics,
89    /// Metrics for PubSub client.
90    pub pubsub_client: PubSubClientMetrics,
91    /// Metrics for mfp/filter pushdown.
92    pub pushdown: PushdownMetrics,
93    /// Metrics for consolidation.
94    pub consolidation: ConsolidationMetrics,
95    /// Metrics for blob caching.
96    pub blob_cache_mem: BlobMemCache,
97    /// Metrics for tokio tasks.
98    pub tasks: TasksMetrics,
99    /// Metrics for columnar data encoding and decoding.
100    pub columnar: ColumnarMetrics,
101    /// Metrics for schemas and the schema registry.
102    pub schema: SchemaMetrics,
103    /// Metrics for inline writes.
104    pub inline: InlineMetrics,
105    /// Semaphore to limit memory/disk use by fetches.
106    pub(crate) semaphore: SemaphoreMetrics,
107
108    /// Metrics for the persist sink.
109    pub sink: SinkMetrics,
110
111    /// Metrics for S3-backed blob implementation
112    pub s3_blob: S3BlobMetrics,
113    /// Metrics for Postgres-backed consensus implementation
114    pub postgres_consensus: PostgresClientMetrics,
115
116    #[allow(dead_code)]
117    pub(crate) registry: MetricsRegistry,
118}
119
120impl std::fmt::Debug for Metrics {
121    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122        f.debug_struct("Metrics").finish_non_exhaustive()
123    }
124}
125
126impl Metrics {
127    /// Returns a new [Metrics] instance connected to the given registry.
128    pub fn new(cfg: &PersistConfig, registry: &MetricsRegistry) -> Self {
129        let vecs = MetricsVecs::new(registry);
130        let start = Instant::now();
131        let uptime = registry.register_computed_gauge(
132            metric!(
133                name: "mz_persist_metadata_seconds",
134                help: "server uptime, labels are build metadata",
135                const_labels: {
136                    "version" => cfg.build_version,
137                    "build_type" => if cfg!(release) { "release" } else { "debug" }
138                },
139            ),
140            move || start.elapsed().as_secs_f64(),
141        );
142        let s3_blob = S3BlobMetrics::new(registry);
143        let columnar = ColumnarMetrics::new(
144            registry,
145            &s3_blob.lgbytes,
146            Arc::clone(&cfg.configs),
147            cfg.is_cc_active,
148        );
149        Metrics {
150            blob: vecs.blob_metrics(),
151            consensus: vecs.consensus_metrics(),
152            cmds: vecs.cmds_metrics(registry),
153            retries: vecs.retries_metrics(),
154            codecs: vecs.codecs_metrics(),
155            user: BatchWriteMetrics::new(registry, "user"),
156            read: vecs.batch_part_read_metrics(),
157            compaction: CompactionMetrics::new(registry),
158            gc: GcMetrics::new(registry),
159            lease: LeaseMetrics::new(registry),
160            state: StateMetrics::new(registry),
161            shards: ShardsMetrics::new(registry),
162            audit: UsageAuditMetrics::new(registry),
163            locks: vecs.locks_metrics(),
164            watch: WatchMetrics::new(registry),
165            pubsub_client: PubSubClientMetrics::new(registry),
166            pushdown: PushdownMetrics::new(registry),
167            consolidation: ConsolidationMetrics::new(registry),
168            blob_cache_mem: BlobMemCache::new(registry),
169            tasks: TasksMetrics::new(registry),
170            columnar,
171            schema: SchemaMetrics::new(registry),
172            inline: InlineMetrics::new(registry),
173            semaphore: SemaphoreMetrics::new(cfg.clone(), registry.clone()),
174            sink: SinkMetrics::new(registry),
175            s3_blob,
176            postgres_consensus: PostgresClientMetrics::new(registry, "mz_persist"),
177            _vecs: vecs,
178            _uptime: uptime,
179            registry: registry.clone(),
180        }
181    }
182
183    /// Returns the current lifetime write amplification reflected in these
184    /// metrics.
185    ///
186    /// Only exposed for tests, persistcli, and benchmarks.
187    pub fn write_amplification(&self) -> f64 {
188        // This intentionally uses "bytes" for total and "goodbytes" for user so
189        // that the overhead of our blob format is included.
190        let total_written = self.blob.set.bytes.get();
191        let user_written = self.user.goodbytes.get();
192        #[allow(clippy::as_conversions)]
193        {
194            total_written as f64 / user_written as f64
195        }
196    }
197}
198
199#[derive(Debug)]
200struct MetricsVecs {
201    cmd_started: IntCounterVec,
202    cmd_cas_mismatch: IntCounterVec,
203    cmd_succeeded: IntCounterVec,
204    cmd_failed: IntCounterVec,
205    cmd_seconds: CounterVec,
206
207    external_op_started: IntCounterVec,
208    external_op_succeeded: IntCounterVec,
209    external_op_failed: IntCounterVec,
210    external_op_bytes: IntCounterVec,
211    external_op_seconds: CounterVec,
212    external_consensus_truncated_count: IntCounter,
213    external_blob_delete_noop_count: IntCounter,
214    external_blob_sizes: Histogram,
215    external_rtt_latency: GaugeVec,
216    external_op_latency: HistogramVec,
217
218    retry_started: IntCounterVec,
219    retry_finished: IntCounterVec,
220    retry_retries: IntCounterVec,
221    retry_sleep_seconds: CounterVec,
222
223    encode_count: IntCounterVec,
224    encode_seconds: CounterVec,
225    decode_count: IntCounterVec,
226    decode_seconds: CounterVec,
227
228    read_part_bytes: IntCounterVec,
229    read_part_goodbytes: IntCounterVec,
230    read_part_count: IntCounterVec,
231    read_part_seconds: CounterVec,
232    read_ts_rewrite: IntCounterVec,
233
234    lock_acquire_count: IntCounterVec,
235    lock_blocking_acquire_count: IntCounterVec,
236    lock_blocking_seconds: CounterVec,
237
238    /// A minimal set of metrics imported into honeycomb for alerting.
239    alerts_metrics: Arc<AlertsMetrics>,
240}
241
242impl MetricsVecs {
243    fn new(registry: &MetricsRegistry) -> Self {
244        MetricsVecs {
245            cmd_started: registry.register(metric!(
246                name: "mz_persist_cmd_started_count",
247                help: "count of commands started",
248                var_labels: ["cmd"],
249            )),
250            cmd_cas_mismatch: registry.register(metric!(
251                name: "mz_persist_cmd_cas_mismatch_count",
252                help: "count of command retries from CaS mismatch",
253                var_labels: ["cmd"],
254            )),
255            cmd_succeeded: registry.register(metric!(
256                name: "mz_persist_cmd_succeeded_count",
257                help: "count of commands succeeded",
258                var_labels: ["cmd"],
259            )),
260            cmd_failed: registry.register(metric!(
261                name: "mz_persist_cmd_failed_count",
262                help: "count of commands failed",
263                var_labels: ["cmd"],
264            )),
265            cmd_seconds: registry.register(metric!(
266                name: "mz_persist_cmd_seconds",
267                help: "time spent applying commands",
268                var_labels: ["cmd"],
269            )),
270
271            external_op_started: registry.register(metric!(
272                name: "mz_persist_external_started_count",
273                help: "count of external service calls started",
274                var_labels: ["op"],
275            )),
276            external_op_succeeded: registry.register(metric!(
277                name: "mz_persist_external_succeeded_count",
278                help: "count of external service calls succeeded",
279                var_labels: ["op"],
280            )),
281            external_op_failed: registry.register(metric!(
282                name: "mz_persist_external_failed_count",
283                help: "count of external service calls failed",
284                var_labels: ["op"],
285            )),
286            external_op_bytes: registry.register(metric!(
287                name: "mz_persist_external_bytes_count",
288                help: "total size represented by external service calls",
289                var_labels: ["op"],
290            )),
291            external_op_seconds: registry.register(metric!(
292                name: "mz_persist_external_seconds",
293                help: "time spent in external service calls",
294                var_labels: ["op"],
295            )),
296            external_consensus_truncated_count: registry.register(metric!(
297                name: "mz_persist_external_consensus_truncated_count",
298                help: "count of versions deleted by consensus truncate calls",
299            )),
300            external_blob_delete_noop_count: registry.register(metric!(
301                name: "mz_persist_external_blob_delete_noop_count",
302                help: "count of blob delete calls that deleted a non-existent key",
303            )),
304            external_blob_sizes: registry.register(metric!(
305                name: "mz_persist_external_blob_sizes",
306                help: "histogram of blob sizes at put time",
307                buckets: mz_ore::stats::HISTOGRAM_BYTE_BUCKETS.to_vec(),
308            )),
309            external_rtt_latency: registry.register(metric!(
310                name: "mz_persist_external_rtt_latency",
311                help: "roundtrip-time to external service as seen by this process",
312                var_labels: ["external"],
313            )),
314            external_op_latency: registry.register(metric!(
315                name: "mz_persist_external_op_latency",
316                help: "rountrip latency observed by individual performance-critical operations",
317                var_labels: ["op"],
318                // NB: If we end up overrunning metrics quotas, we could plausibly cut this
319                // down by switching to a factor of 4 between buckets (vs. the standard 2).
320                buckets: histogram_seconds_buckets(0.000_500, 32.0),
321            )),
322
323            retry_started: registry.register(metric!(
324                name: "mz_persist_retry_started_count",
325                help: "count of retry loops started",
326                var_labels: ["op"],
327            )),
328            retry_finished: registry.register(metric!(
329                name: "mz_persist_retry_finished_count",
330                help: "count of retry loops finished",
331                var_labels: ["op"],
332            )),
333            retry_retries: registry.register(metric!(
334                name: "mz_persist_retry_retries_count",
335                help: "count of total attempts by retry loops",
336                var_labels: ["op"],
337            )),
338            retry_sleep_seconds: registry.register(metric!(
339                name: "mz_persist_retry_sleep_seconds",
340                help: "time spent in retry loop backoff",
341                var_labels: ["op"],
342            )),
343
344            encode_count: registry.register(metric!(
345                name: "mz_persist_encode_count",
346                help: "count of op encodes",
347                var_labels: ["op"],
348            )),
349            encode_seconds: registry.register(metric!(
350                name: "mz_persist_encode_seconds",
351                help: "time spent in op encodes",
352                var_labels: ["op"],
353            )),
354            decode_count: registry.register(metric!(
355                name: "mz_persist_decode_count",
356                help: "count of op decodes",
357                var_labels: ["op"],
358            )),
359            decode_seconds: registry.register(metric!(
360                name: "mz_persist_decode_seconds",
361                help: "time spent in op decodes",
362                var_labels: ["op"],
363            )),
364
365            read_part_bytes: registry.register(metric!(
366                name: "mz_persist_read_batch_part_bytes",
367                help: "total encoded size of batch parts read",
368                var_labels: ["op"],
369            )),
370            read_part_goodbytes: registry.register(metric!(
371                name: "mz_persist_read_batch_part_goodbytes",
372                help: "total logical size of batch parts read",
373                var_labels: ["op"],
374            )),
375            read_part_count: registry.register(metric!(
376                name: "mz_persist_read_batch_part_count",
377                help: "count of batch parts read",
378                var_labels: ["op"],
379            )),
380            read_part_seconds: registry.register(metric!(
381                name: "mz_persist_read_batch_part_seconds",
382                help: "time spent reading batch parts",
383                var_labels: ["op"],
384            )),
385            read_ts_rewrite: registry.register(metric!(
386                name: "mz_persist_read_ts_rewite",
387                help: "count of updates read with rewritten ts",
388                var_labels: ["op"],
389            )),
390
391            lock_acquire_count: registry.register(metric!(
392                name: "mz_persist_lock_acquire_count",
393                help: "count of locks acquired",
394                var_labels: ["op"],
395            )),
396            lock_blocking_acquire_count: registry.register(metric!(
397                name: "mz_persist_lock_blocking_acquire_count",
398                help: "count of locks acquired that required blocking",
399                var_labels: ["op"],
400            )),
401            lock_blocking_seconds: registry.register(metric!(
402                name: "mz_persist_lock_blocking_seconds",
403                help: "time spent blocked for a lock",
404                var_labels: ["op"],
405            )),
406
407            alerts_metrics: Arc::new(AlertsMetrics::new(registry)),
408        }
409    }
410
411    fn cmds_metrics(&self, registry: &MetricsRegistry) -> CmdsMetrics {
412        CmdsMetrics {
413            init_state: self.cmd_metrics("init_state"),
414            add_rollup: self.cmd_metrics("add_rollup"),
415            remove_rollups: self.cmd_metrics("remove_rollups"),
416            register: self.cmd_metrics("register"),
417            compare_and_append: self.cmd_metrics("compare_and_append"),
418            compare_and_append_noop:             registry.register(metric!(
419                name: "mz_persist_cmd_compare_and_append_noop",
420                help: "count of compare_and_append retries that were discoverd to have already committed",
421            )),
422            compare_and_downgrade_since: self.cmd_metrics("compare_and_downgrade_since"),
423            downgrade_since: self.cmd_metrics("downgrade_since"),
424            heartbeat_reader: self.cmd_metrics("heartbeat_reader"),
425            expire_reader: self.cmd_metrics("expire_reader"),
426            expire_writer: self.cmd_metrics("expire_writer"),
427            merge_res: self.cmd_metrics("merge_res"),
428            become_tombstone: self.cmd_metrics("become_tombstone"),
429            compare_and_evolve_schema: self.cmd_metrics("compare_and_evolve_schema"),
430            spine_exert: self.cmd_metrics("spine_exert"),
431            fetch_upper_count: registry.register(metric!(
432                name: "mz_persist_cmd_fetch_upper_count",
433                help: "count of fetch_upper calls",
434            ))
435        }
436    }
437
438    fn cmd_metrics(&self, cmd: &str) -> CmdMetrics {
439        CmdMetrics {
440            name: cmd.to_owned(),
441            started: self.cmd_started.with_label_values(&[cmd]),
442            succeeded: self.cmd_succeeded.with_label_values(&[cmd]),
443            cas_mismatch: self.cmd_cas_mismatch.with_label_values(&[cmd]),
444            failed: self.cmd_failed.with_label_values(&[cmd]),
445            seconds: self.cmd_seconds.with_label_values(&[cmd]),
446        }
447    }
448
449    fn retries_metrics(&self) -> RetriesMetrics {
450        RetriesMetrics {
451            determinate: RetryDeterminate {
452                apply_unbatched_cmd_cas: self.retry_metrics("apply_unbatched_cmd::cas"),
453            },
454            external: RetryExternal {
455                batch_delete: Arc::new(self.retry_metrics("batch::delete")),
456                batch_set: self.retry_metrics("batch::set"),
457                blob_open: self.retry_metrics("blob::open"),
458                compaction_noop_delete: Arc::new(self.retry_metrics("compaction_noop::delete")),
459                consensus_open: self.retry_metrics("consensus::open"),
460                fetch_batch_get: self.retry_metrics("fetch_batch::get"),
461                fetch_state_scan: self.retry_metrics("fetch_state::scan"),
462                gc_truncate: self.retry_metrics("gc::truncate"),
463                maybe_init_cas: self.retry_metrics("maybe_init::cas"),
464                rollup_delete: self.retry_metrics("rollup::delete"),
465                rollup_get: self.retry_metrics("rollup::get"),
466                rollup_set: self.retry_metrics("rollup::set"),
467                hollow_run_get: self.retry_metrics("hollow_run::get"),
468                hollow_run_set: self.retry_metrics("hollow_run::set"),
469                storage_usage_shard_size: self.retry_metrics("storage_usage::shard_size"),
470            },
471            compare_and_append_idempotent: self.retry_metrics("compare_and_append_idempotent"),
472            fetch_latest_state: self.retry_metrics("fetch_latest_state"),
473            fetch_live_states: self.retry_metrics("fetch_live_states"),
474            idempotent_cmd: self.retry_metrics("idempotent_cmd"),
475            next_listen_batch: self.retry_metrics("next_listen_batch"),
476            snapshot: self.retry_metrics("snapshot"),
477        }
478    }
479
480    fn retry_metrics(&self, name: &str) -> RetryMetrics {
481        RetryMetrics {
482            name: name.to_owned(),
483            started: self.retry_started.with_label_values(&[name]),
484            finished: self.retry_finished.with_label_values(&[name]),
485            retries: self.retry_retries.with_label_values(&[name]),
486            sleep_seconds: self.retry_sleep_seconds.with_label_values(&[name]),
487        }
488    }
489
490    fn codecs_metrics(&self) -> CodecsMetrics {
491        CodecsMetrics {
492            state: self.codec_metrics("state"),
493            state_diff: self.codec_metrics("state_diff"),
494            batch: self.codec_metrics("batch"),
495            key: self.codec_metrics("key"),
496            val: self.codec_metrics("val"),
497        }
498    }
499
500    fn codec_metrics(&self, op: &str) -> CodecMetrics {
501        CodecMetrics {
502            encode_count: self.encode_count.with_label_values(&[op]),
503            encode_seconds: self.encode_seconds.with_label_values(&[op]),
504            decode_count: self.decode_count.with_label_values(&[op]),
505            decode_seconds: self.decode_seconds.with_label_values(&[op]),
506        }
507    }
508
509    fn blob_metrics(&self) -> BlobMetrics {
510        BlobMetrics {
511            set: self.external_op_metrics("blob_set", true),
512            get: self.external_op_metrics("blob_get", true),
513            list_keys: self.external_op_metrics("blob_list_keys", false),
514            delete: self.external_op_metrics("blob_delete", false),
515            restore: self.external_op_metrics("restore", false),
516            delete_noop: self.external_blob_delete_noop_count.clone(),
517            blob_sizes: self.external_blob_sizes.clone(),
518            rtt_latency: self.external_rtt_latency.with_label_values(&["blob"]),
519        }
520    }
521
522    fn consensus_metrics(&self) -> ConsensusMetrics {
523        ConsensusMetrics {
524            list_keys: self.external_op_metrics("consensus_list_keys", false),
525            head: self.external_op_metrics("consensus_head", false),
526            compare_and_set: self.external_op_metrics("consensus_cas", true),
527            scan: self.external_op_metrics("consensus_scan", false),
528            truncate: self.external_op_metrics("consensus_truncate", false),
529            truncated_count: self.external_consensus_truncated_count.clone(),
530            rtt_latency: self.external_rtt_latency.with_label_values(&["consensus"]),
531        }
532    }
533
534    fn external_op_metrics(&self, op: &str, latency_histogram: bool) -> ExternalOpMetrics {
535        ExternalOpMetrics {
536            started: self.external_op_started.with_label_values(&[op]),
537            succeeded: self.external_op_succeeded.with_label_values(&[op]),
538            failed: self.external_op_failed.with_label_values(&[op]),
539            bytes: self.external_op_bytes.with_label_values(&[op]),
540            seconds: self.external_op_seconds.with_label_values(&[op]),
541            seconds_histogram: if latency_histogram {
542                Some(self.external_op_latency.with_label_values(&[op]))
543            } else {
544                None
545            },
546            alerts_metrics: Arc::clone(&self.alerts_metrics),
547        }
548    }
549
550    fn batch_part_read_metrics(&self) -> BatchPartReadMetrics {
551        BatchPartReadMetrics {
552            listen: self.read_metrics("listen"),
553            snapshot: self.read_metrics("snapshot"),
554            batch_fetcher: self.read_metrics("batch_fetcher"),
555            compaction: self.read_metrics("compaction"),
556            unindexed: self.read_metrics("unindexed"),
557        }
558    }
559
560    fn read_metrics(&self, op: &str) -> ReadMetrics {
561        ReadMetrics {
562            part_bytes: self.read_part_bytes.with_label_values(&[op]),
563            part_goodbytes: self.read_part_goodbytes.with_label_values(&[op]),
564            part_count: self.read_part_count.with_label_values(&[op]),
565            seconds: self.read_part_seconds.with_label_values(&[op]),
566            ts_rewrite: self.read_ts_rewrite.with_label_values(&[op]),
567        }
568    }
569
570    fn locks_metrics(&self) -> LocksMetrics {
571        LocksMetrics {
572            applier_read_cacheable: self.lock_metrics("applier_read_cacheable"),
573            applier_read_noncacheable: self.lock_metrics("applier_read_noncacheable"),
574            applier_write: self.lock_metrics("applier_write"),
575            watch: self.lock_metrics("watch"),
576        }
577    }
578
579    fn lock_metrics(&self, op: &str) -> LockMetrics {
580        LockMetrics {
581            acquire_count: self.lock_acquire_count.with_label_values(&[op]),
582            blocking_acquire_count: self.lock_blocking_acquire_count.with_label_values(&[op]),
583            blocking_seconds: self.lock_blocking_seconds.with_label_values(&[op]),
584        }
585    }
586}
587
588#[derive(Debug)]
589pub struct CmdMetrics {
590    pub(crate) name: String,
591    pub(crate) started: IntCounter,
592    pub(crate) cas_mismatch: IntCounter,
593    pub(crate) succeeded: IntCounter,
594    pub(crate) failed: IntCounter,
595    pub(crate) seconds: Counter,
596}
597
598impl CmdMetrics {
599    pub async fn run_cmd<R, E, F, CmdFn>(
600        &self,
601        shard_metrics: &ShardMetrics,
602        cmd_fn: CmdFn,
603    ) -> Result<R, E>
604    where
605        F: std::future::Future<Output = Result<R, E>>,
606        CmdFn: FnOnce() -> F,
607    {
608        self.started.inc();
609        let start = Instant::now();
610        let res = cmd_fn().await;
611        self.seconds.inc_by(start.elapsed().as_secs_f64());
612        match res.as_ref() {
613            Ok(_) => {
614                self.succeeded.inc();
615                shard_metrics.cmd_succeeded.inc();
616            }
617            Err(_) => self.failed.inc(),
618        };
619        res
620    }
621}
622
623#[derive(Debug)]
624pub struct CmdsMetrics {
625    pub(crate) init_state: CmdMetrics,
626    pub(crate) add_rollup: CmdMetrics,
627    pub(crate) remove_rollups: CmdMetrics,
628    pub(crate) register: CmdMetrics,
629    pub(crate) compare_and_append: CmdMetrics,
630    pub(crate) compare_and_append_noop: IntCounter,
631    pub(crate) compare_and_downgrade_since: CmdMetrics,
632    pub(crate) downgrade_since: CmdMetrics,
633    pub(crate) heartbeat_reader: CmdMetrics,
634    pub(crate) expire_reader: CmdMetrics,
635    pub(crate) expire_writer: CmdMetrics,
636    pub(crate) merge_res: CmdMetrics,
637    pub(crate) become_tombstone: CmdMetrics,
638    pub(crate) compare_and_evolve_schema: CmdMetrics,
639    pub(crate) spine_exert: CmdMetrics,
640    pub(crate) fetch_upper_count: IntCounter,
641}
642
643#[derive(Debug)]
644pub struct RetryMetrics {
645    pub(crate) name: String,
646    pub(crate) started: IntCounter,
647    pub(crate) finished: IntCounter,
648    pub(crate) retries: IntCounter,
649    pub(crate) sleep_seconds: Counter,
650}
651
652impl RetryMetrics {
653    pub(crate) fn stream(&self, retry: RetryStream) -> MetricsRetryStream {
654        MetricsRetryStream::new(retry, self)
655    }
656}
657
658#[derive(Debug)]
659pub struct RetryDeterminate {
660    pub(crate) apply_unbatched_cmd_cas: RetryMetrics,
661}
662
663#[derive(Debug)]
664pub struct RetryExternal {
665    pub(crate) batch_delete: Arc<RetryMetrics>,
666    pub(crate) batch_set: RetryMetrics,
667    pub(crate) blob_open: RetryMetrics,
668    pub(crate) compaction_noop_delete: Arc<RetryMetrics>,
669    pub(crate) consensus_open: RetryMetrics,
670    pub(crate) fetch_batch_get: RetryMetrics,
671    pub(crate) fetch_state_scan: RetryMetrics,
672    pub(crate) gc_truncate: RetryMetrics,
673    pub(crate) maybe_init_cas: RetryMetrics,
674    pub(crate) rollup_delete: RetryMetrics,
675    pub(crate) rollup_get: RetryMetrics,
676    pub(crate) rollup_set: RetryMetrics,
677    pub(crate) hollow_run_get: RetryMetrics,
678    pub(crate) hollow_run_set: RetryMetrics,
679    pub(crate) storage_usage_shard_size: RetryMetrics,
680}
681
682#[derive(Debug)]
683pub struct RetriesMetrics {
684    pub(crate) determinate: RetryDeterminate,
685    pub(crate) external: RetryExternal,
686
687    pub(crate) compare_and_append_idempotent: RetryMetrics,
688    pub(crate) fetch_latest_state: RetryMetrics,
689    pub(crate) fetch_live_states: RetryMetrics,
690    pub(crate) idempotent_cmd: RetryMetrics,
691    pub(crate) next_listen_batch: RetryMetrics,
692    pub(crate) snapshot: RetryMetrics,
693}
694
695#[derive(Debug)]
696pub struct BatchPartReadMetrics {
697    pub(crate) listen: ReadMetrics,
698    pub(crate) snapshot: ReadMetrics,
699    pub(crate) batch_fetcher: ReadMetrics,
700    pub(crate) compaction: ReadMetrics,
701    pub(crate) unindexed: ReadMetrics,
702}
703
704#[derive(Debug, Clone)]
705pub struct ReadMetrics {
706    pub(crate) part_bytes: IntCounter,
707    pub(crate) part_goodbytes: IntCounter,
708    pub(crate) part_count: IntCounter,
709    pub(crate) seconds: Counter,
710    pub(crate) ts_rewrite: IntCounter,
711}
712
713// This one is Clone in contrast to the others because it has to get moved into
714// a task.
715#[derive(Debug, Clone)]
716pub struct BatchWriteMetrics {
717    pub(crate) bytes: IntCounter,
718    pub(crate) goodbytes: IntCounter,
719    pub(crate) seconds: Counter,
720    pub(crate) write_stalls: IntCounter,
721    pub(crate) key_lower_too_big: IntCounter,
722
723    pub(crate) unordered: IntCounter,
724    pub(crate) codec_order: IntCounter,
725    pub(crate) structured_order: IntCounter,
726    _order_counts: IntCounterVec,
727
728    pub(crate) step_stats: Counter,
729    pub(crate) step_part_writing: Counter,
730    pub(crate) step_inline: Counter,
731}
732
733impl BatchWriteMetrics {
734    fn new(registry: &MetricsRegistry, name: &str) -> Self {
735        let order_counts: IntCounterVec = registry.register(metric!(
736                name: format!("mz_persist_{}_write_batch_order", name),
737                help: "count of batches by the data ordering",
738                var_labels: ["order"],
739        ));
740        let unordered = order_counts.with_label_values(&["unordered"]);
741        let codec_order = order_counts.with_label_values(&["codec"]);
742        let structured_order = order_counts.with_label_values(&["structured"]);
743
744        BatchWriteMetrics {
745            bytes: registry.register(metric!(
746                name: format!("mz_persist_{}_bytes", name),
747                help: format!("total encoded size of {} batches written", name),
748            )),
749            goodbytes: registry.register(metric!(
750                name: format!("mz_persist_{}_goodbytes", name),
751                help: format!("total logical size of {} batches written", name),
752            )),
753            seconds: registry.register(metric!(
754                name: format!("mz_persist_{}_write_batch_part_seconds", name),
755                help: format!("time spent writing {} batches", name),
756            )),
757            write_stalls: registry.register(metric!(
758                name: format!("mz_persist_{}_write_stall_count", name),
759                help: format!(
760                    "count of {} writes stalling to await max outstanding reqs",
761                    name
762                ),
763            )),
764            key_lower_too_big: registry.register(metric!(
765                name: format!("mz_persist_{}_key_lower_too_big", name),
766                help: format!(
767                    "count of {} writes that were unable to write a key lower, because the size threshold was too low",
768                    name
769                ),
770            )),
771            unordered,
772            codec_order,
773            structured_order,
774            _order_counts: order_counts,
775            step_stats: registry.register(metric!(
776                name: format!("mz_persist_{}_step_stats", name),
777                help: format!("time spent computing {} update stats", name),
778            )),
779            step_part_writing: registry.register(metric!(
780                name: format!("mz_persist_{}_step_part_writing", name),
781                help: format!("blocking time spent writing parts for {} updates", name),
782            )),
783            step_inline: registry.register(metric!(
784                name: format!("mz_persist_{}_step_inline", name),
785                help: format!("time spent encoding {} inline batches", name)
786            )),
787        }
788    }
789}
790
791#[derive(Debug)]
792pub struct CompactionMetrics {
793    pub(crate) requested: IntCounter,
794    pub(crate) dropped: IntCounter,
795    pub(crate) disabled: IntCounter,
796    pub(crate) skipped: IntCounter,
797    pub(crate) started: IntCounter,
798    pub(crate) applied: IntCounter,
799    pub(crate) timed_out: IntCounter,
800    pub(crate) failed: IntCounter,
801    pub(crate) noop: IntCounter,
802    pub(crate) seconds: Counter,
803    pub(crate) concurrency_waits: IntCounter,
804    pub(crate) queued_seconds: Counter,
805    pub(crate) memory_violations: IntCounter,
806    pub(crate) runs_compacted: IntCounter,
807    pub(crate) chunks_compacted: IntCounter,
808    pub(crate) not_all_prefetched: IntCounter,
809    pub(crate) parts_prefetched: IntCounter,
810    pub(crate) parts_waited: IntCounter,
811    pub(crate) fast_path_eligible: IntCounter,
812    pub(crate) admin_count: IntCounter,
813
814    pub(crate) applied_exact_match: IntCounter,
815    pub(crate) applied_subset_match: IntCounter,
816    pub(crate) not_applied_too_many_updates: IntCounter,
817
818    pub(crate) batch: BatchWriteMetrics,
819    pub(crate) steps: CompactionStepTimings,
820    pub(crate) schema_selection: CompactionSchemaSelection,
821
822    pub(crate) _steps_vec: CounterVec,
823}
824
825impl CompactionMetrics {
826    fn new(registry: &MetricsRegistry) -> Self {
827        let step_timings: CounterVec = registry.register(metric!(
828                name: "mz_persist_compaction_step_seconds",
829                help: "time spent on individual steps of compaction",
830                var_labels: ["step"],
831        ));
832        let schema_selection: CounterVec = registry.register(metric!(
833            name: "mz_persist_compaction_schema_selection",
834            help: "count of compactions and how we did schema selection",
835            var_labels: ["selection"],
836        ));
837
838        CompactionMetrics {
839            requested: registry.register(metric!(
840                name: "mz_persist_compaction_requested",
841                help: "count of total compaction requests",
842            )),
843            dropped: registry.register(metric!(
844                name: "mz_persist_compaction_dropped",
845                help: "count of total compaction requests dropped due to a full queue",
846            )),
847            disabled: registry.register(metric!(
848                name: "mz_persist_compaction_disabled",
849                help: "count of total compaction requests dropped because compaction was disabled",
850            )),
851            skipped: registry.register(metric!(
852                name: "mz_persist_compaction_skipped",
853                help: "count of compactions skipped due to heuristics",
854            )),
855            started: registry.register(metric!(
856                name: "mz_persist_compaction_started",
857                help: "count of compactions started",
858            )),
859            failed: registry.register(metric!(
860                name: "mz_persist_compaction_failed",
861                help: "count of compactions failed",
862            )),
863            applied: registry.register(metric!(
864                name: "mz_persist_compaction_applied",
865                help: "count of compactions applied to state",
866            )),
867            timed_out: registry.register(metric!(
868                name: "mz_persist_compaction_timed_out",
869                help: "count of compactions that timed out",
870            )),
871            noop: registry.register(metric!(
872                name: "mz_persist_compaction_noop",
873                help: "count of compactions discarded (obsolete)",
874            )),
875            seconds: registry.register(metric!(
876                name: "mz_persist_compaction_seconds",
877                help: "time spent in compaction",
878            )),
879            concurrency_waits: registry.register(metric!(
880                name: "mz_persist_compaction_concurrency_waits",
881                help: "count of compaction requests that ever blocked due to concurrency limit",
882            )),
883            queued_seconds: registry.register(metric!(
884                name: "mz_persist_compaction_queued_seconds",
885                help: "time that compaction requests spent queued",
886            )),
887            memory_violations: registry.register(metric!(
888                name: "mz_persist_compaction_memory_violations",
889                help: "count of compaction memory requirement violations",
890            )),
891            runs_compacted: registry.register(metric!(
892                name: "mz_persist_compaction_runs_compacted",
893                help: "count of runs compacted",
894            )),
895            chunks_compacted: registry.register(metric!(
896                name: "mz_persist_compaction_chunks_compacted",
897                help: "count of run chunks compacted",
898            )),
899            not_all_prefetched: registry.register(metric!(
900                name: "mz_persist_compaction_not_all_prefetched",
901                help: "count of compactions where not all inputs were prefetched",
902            )),
903            parts_prefetched: registry.register(metric!(
904                name: "mz_persist_compaction_parts_prefetched",
905                help: "count of compaction parts completely prefetched by the time they're needed",
906            )),
907            parts_waited: registry.register(metric!(
908                name: "mz_persist_compaction_parts_waited",
909                help: "count of compaction parts that had to be waited on",
910            )),
911            fast_path_eligible: registry.register(metric!(
912                name: "mz_persist_compaction_fast_path_eligible",
913                help: "count of compaction requests that could have used the fast-path optimization",
914            )),
915            admin_count: registry.register(metric!(
916                name: "mz_persist_compaction_admin_count",
917                help: "count of compaction requests that were performed by admin tooling",
918            )),
919            applied_exact_match: registry.register(metric!(
920                name: "mz_persist_compaction_applied_exact_match",
921                help: "count of merge results that exactly replaced a SpineBatch",
922            )),
923            applied_subset_match: registry.register(metric!(
924                name: "mz_persist_compaction_applied_subset_match",
925                help: "count of merge results that replaced a subset of a SpineBatch",
926            )),
927            not_applied_too_many_updates: registry.register(metric!(
928                name: "mz_persist_compaction_not_applied_too_many_updates",
929                help: "count of merge results that did not apply due to too many updates",
930            )),
931            batch: BatchWriteMetrics::new(registry, "compaction"),
932            steps: CompactionStepTimings::new(step_timings.clone()),
933            schema_selection: CompactionSchemaSelection::new(schema_selection.clone()),
934            _steps_vec: step_timings,
935        }
936    }
937}
938
939#[derive(Debug)]
940pub struct CompactionStepTimings {
941    pub(crate) part_fetch_seconds: Counter,
942    pub(crate) heap_population_seconds: Counter,
943}
944
945impl CompactionStepTimings {
946    fn new(step_timings: CounterVec) -> CompactionStepTimings {
947        CompactionStepTimings {
948            part_fetch_seconds: step_timings.with_label_values(&["part_fetch"]),
949            heap_population_seconds: step_timings.with_label_values(&["heap_population"]),
950        }
951    }
952}
953
954#[derive(Debug)]
955pub struct CompactionSchemaSelection {
956    pub(crate) recent_schema: Counter,
957    pub(crate) no_schema: Counter,
958}
959
960impl CompactionSchemaSelection {
961    fn new(schema_selection: CounterVec) -> CompactionSchemaSelection {
962        CompactionSchemaSelection {
963            recent_schema: schema_selection.with_label_values(&["recent"]),
964            no_schema: schema_selection.with_label_values(&["none"]),
965        }
966    }
967}
968
969#[derive(Debug)]
970pub struct GcMetrics {
971    pub(crate) noop: IntCounter,
972    pub(crate) started: IntCounter,
973    pub(crate) finished: IntCounter,
974    pub(crate) merged: IntCounter,
975    pub(crate) seconds: Counter,
976    pub(crate) steps: GcStepTimings,
977}
978
979#[derive(Debug)]
980pub struct GcStepTimings {
981    pub(crate) find_removable_rollups: Counter,
982    pub(crate) fetch_seconds: Counter,
983    pub(crate) find_deletable_blobs_seconds: Counter,
984    pub(crate) delete_rollup_seconds: Counter,
985    pub(crate) delete_batch_part_seconds: Counter,
986    pub(crate) truncate_diff_seconds: Counter,
987    pub(crate) remove_rollups_from_state: Counter,
988    pub(crate) post_gc_calculations_seconds: Counter,
989}
990
991impl GcStepTimings {
992    fn new(step_timings: CounterVec) -> Self {
993        Self {
994            find_removable_rollups: step_timings.with_label_values(&["find_removable_rollups"]),
995            fetch_seconds: step_timings.with_label_values(&["fetch"]),
996            find_deletable_blobs_seconds: step_timings.with_label_values(&["find_deletable_blobs"]),
997            delete_rollup_seconds: step_timings.with_label_values(&["delete_rollup"]),
998            delete_batch_part_seconds: step_timings.with_label_values(&["delete_batch_part"]),
999            truncate_diff_seconds: step_timings.with_label_values(&["truncate_diff"]),
1000            remove_rollups_from_state: step_timings
1001                .with_label_values(&["remove_rollups_from_state"]),
1002            post_gc_calculations_seconds: step_timings.with_label_values(&["post_gc_calculations"]),
1003        }
1004    }
1005}
1006
1007impl GcMetrics {
1008    fn new(registry: &MetricsRegistry) -> Self {
1009        let step_timings: CounterVec = registry.register(metric!(
1010                name: "mz_persist_gc_step_seconds",
1011                help: "time spent on individual steps of gc",
1012                var_labels: ["step"],
1013        ));
1014        GcMetrics {
1015            noop: registry.register(metric!(
1016                name: "mz_persist_gc_noop",
1017                help: "count of garbage collections skipped because they were already done",
1018            )),
1019            started: registry.register(metric!(
1020                name: "mz_persist_gc_started",
1021                help: "count of garbage collections started",
1022            )),
1023            finished: registry.register(metric!(
1024                name: "mz_persist_gc_finished",
1025                help: "count of garbage collections finished",
1026            )),
1027            merged: registry.register(metric!(
1028                name: "mz_persist_gc_merged_reqs",
1029                help: "count of garbage collection requests merged",
1030            )),
1031            seconds: registry.register(metric!(
1032                name: "mz_persist_gc_seconds",
1033                help: "time spent in garbage collections",
1034            )),
1035            steps: GcStepTimings::new(step_timings),
1036        }
1037    }
1038}
1039
1040#[derive(Debug)]
1041pub struct LeaseMetrics {
1042    pub(crate) timeout_read: IntCounter,
1043    pub(crate) dropped_part: IntCounter,
1044}
1045
1046impl LeaseMetrics {
1047    fn new(registry: &MetricsRegistry) -> Self {
1048        LeaseMetrics {
1049            timeout_read: registry.register(metric!(
1050                name: "mz_persist_lease_timeout_read",
1051                help: "count of readers whose lease timed out",
1052            )),
1053            dropped_part: registry.register(metric!(
1054                name: "mz_persist_lease_dropped_part",
1055                help: "count of LeasedBatchParts that were dropped without being politely returned",
1056            )),
1057        }
1058    }
1059}
1060
1061struct IncOnDrop(IntCounter);
1062
1063impl Drop for IncOnDrop {
1064    fn drop(&mut self) {
1065        self.0.inc()
1066    }
1067}
1068
1069pub struct MetricsRetryStream {
1070    retry: RetryStream,
1071    pub(crate) retries: IntCounter,
1072    sleep_seconds: Counter,
1073    _finished: IncOnDrop,
1074}
1075
1076impl MetricsRetryStream {
1077    pub fn new(retry: RetryStream, metrics: &RetryMetrics) -> Self {
1078        metrics.started.inc();
1079        MetricsRetryStream {
1080            retry,
1081            retries: metrics.retries.clone(),
1082            sleep_seconds: metrics.sleep_seconds.clone(),
1083            _finished: IncOnDrop(metrics.finished.clone()),
1084        }
1085    }
1086
1087    /// How many times [Self::sleep] has been called.
1088    pub fn attempt(&self) -> usize {
1089        self.retry.attempt()
1090    }
1091
1092    /// The next sleep (without jitter for easy printing in logs).
1093    pub fn next_sleep(&self) -> Duration {
1094        self.retry.next_sleep()
1095    }
1096
1097    /// Executes the next sleep in the series.
1098    ///
1099    /// This isn't cancel-safe, so it consumes and returns self, to prevent
1100    /// accidental mis-use.
1101    pub async fn sleep(self) -> Self {
1102        self.retries.inc();
1103        self.sleep_seconds
1104            .inc_by(self.retry.next_sleep().as_secs_f64());
1105        let retry = self.retry.sleep().await;
1106        MetricsRetryStream {
1107            retry,
1108            retries: self.retries,
1109            sleep_seconds: self.sleep_seconds,
1110            _finished: self._finished,
1111        }
1112    }
1113}
1114
1115#[derive(Debug)]
1116pub struct CodecsMetrics {
1117    pub(crate) state: CodecMetrics,
1118    pub(crate) state_diff: CodecMetrics,
1119    pub(crate) batch: CodecMetrics,
1120    pub(crate) key: CodecMetrics,
1121    pub(crate) val: CodecMetrics,
1122    // Intentionally not adding time and diff because they're just
1123    // `{to,from}_le_bytes`.
1124}
1125
1126#[derive(Debug)]
1127pub struct CodecMetrics {
1128    pub(crate) encode_count: IntCounter,
1129    pub(crate) encode_seconds: Counter,
1130    pub(crate) decode_count: IntCounter,
1131    pub(crate) decode_seconds: Counter,
1132}
1133
1134impl CodecMetrics {
1135    pub(crate) fn encode<R, F: FnOnce() -> R>(&self, f: F) -> R {
1136        let now = Instant::now();
1137        let r = f();
1138        self.encode_count.inc();
1139        self.encode_seconds.inc_by(now.elapsed().as_secs_f64());
1140        r
1141    }
1142
1143    pub(crate) fn decode<R, F: FnOnce() -> R>(&self, f: F) -> R {
1144        let now = Instant::now();
1145        let r = f();
1146        self.decode_count.inc();
1147        self.decode_seconds.inc_by(now.elapsed().as_secs_f64());
1148        r
1149    }
1150}
1151
1152#[derive(Debug)]
1153pub struct StateMetrics {
1154    pub(crate) apply_spine_fast_path: IntCounter,
1155    pub(crate) apply_spine_slow_path: IntCounter,
1156    pub(crate) apply_spine_slow_path_lenient: IntCounter,
1157    pub(crate) apply_spine_slow_path_lenient_adjustment: IntCounter,
1158    pub(crate) apply_spine_slow_path_with_reconstruction: IntCounter,
1159    pub(crate) apply_spine_flattened: IntCounter,
1160    pub(crate) update_state_noop_path: IntCounter,
1161    pub(crate) update_state_empty_path: IntCounter,
1162    pub(crate) update_state_fast_path: IntCounter,
1163    pub(crate) update_state_slow_path: IntCounter,
1164    pub(crate) rollup_at_seqno_migration: IntCounter,
1165    pub(crate) fetch_recent_live_diffs_fast_path: IntCounter,
1166    pub(crate) fetch_recent_live_diffs_slow_path: IntCounter,
1167    pub(crate) writer_added: IntCounter,
1168    pub(crate) writer_removed: IntCounter,
1169    pub(crate) force_apply_hostname: IntCounter,
1170    pub(crate) rollup_write_success: IntCounter,
1171    pub(crate) rollup_write_noop_latest: IntCounter,
1172    pub(crate) rollup_write_noop_truncated: IntCounter,
1173}
1174
1175impl StateMetrics {
1176    pub(crate) fn new(registry: &MetricsRegistry) -> Self {
1177        let rollup_write_noop: IntCounterVec = registry.register(metric!(
1178                name: "mz_persist_state_rollup_write_noop",
1179                help: "count of no-op rollup writes",
1180                var_labels: ["reason"],
1181        ));
1182
1183        StateMetrics {
1184            apply_spine_fast_path: registry.register(metric!(
1185                name: "mz_persist_state_apply_spine_fast_path",
1186                help: "count of spine diff applications that hit the fast path",
1187            )),
1188            apply_spine_slow_path: registry.register(metric!(
1189                name: "mz_persist_state_apply_spine_slow_path",
1190                help: "count of spine diff applications that hit the slow path",
1191            )),
1192            apply_spine_slow_path_lenient: registry.register(metric!(
1193                name: "mz_persist_state_apply_spine_slow_path_lenient",
1194                help: "count of spine diff applications that hit the lenient compaction apply path",
1195            )),
1196            apply_spine_slow_path_lenient_adjustment: registry.register(metric!(
1197                name: "mz_persist_state_apply_spine_slow_path_lenient_adjustment",
1198                help: "count of adjustments made by the lenient compaction apply path",
1199            )),
1200            apply_spine_slow_path_with_reconstruction: registry.register(metric!(
1201                name: "mz_persist_state_apply_spine_slow_path_with_reconstruction",
1202                help: "count of spine diff applications that hit the slow path with extra spine reconstruction step",
1203            )),
1204            apply_spine_flattened: registry.register(metric!(
1205                name: "mz_persist_state_apply_spine_flattened",
1206                help: "count of spine diff applications that flatten the trace",
1207            )),
1208            update_state_noop_path: registry.register(metric!(
1209                name: "mz_persist_state_update_state_noop_path",
1210                help: "count of state update applications that no-oped due to shared state",
1211            )),
1212            update_state_empty_path: registry.register(metric!(
1213                name: "mz_persist_state_update_state_empty_path",
1214                help: "count of state update applications that found no new updates",
1215            )),
1216            update_state_fast_path: registry.register(metric!(
1217                name: "mz_persist_state_update_state_fast_path",
1218                help: "count of state update applications that hit the fast path",
1219            )),
1220            update_state_slow_path: registry.register(metric!(
1221                name: "mz_persist_state_update_state_slow_path",
1222                help: "count of state update applications that hit the slow path",
1223            )),
1224            rollup_at_seqno_migration: registry.register(metric!(
1225                name: "mz_persist_state_rollup_at_seqno_migration",
1226                help: "count of fetch_rollup_at_seqno calls that only worked because of the migration",
1227            )),
1228            fetch_recent_live_diffs_fast_path: registry.register(metric!(
1229                name: "mz_persist_state_fetch_recent_live_diffs_fast_path",
1230                help: "count of fetch_recent_live_diffs that hit the fast path",
1231            )),
1232            fetch_recent_live_diffs_slow_path: registry.register(metric!(
1233                name: "mz_persist_state_fetch_recent_live_diffs_slow_path",
1234                help: "count of fetch_recent_live_diffs that hit the slow path",
1235            )),
1236            writer_added: registry.register(metric!(
1237                name: "mz_persist_state_writer_added",
1238                help: "count of writers added to the state",
1239            )),
1240            writer_removed: registry.register(metric!(
1241                name: "mz_persist_state_writer_removed",
1242                help: "count of writers removed from the state",
1243            )),
1244            force_apply_hostname: registry.register(metric!(
1245                name: "mz_persist_state_force_applied_hostname",
1246                help: "count of when hostname diffs needed to be force applied",
1247            )),
1248            rollup_write_success: registry.register(metric!(
1249                name: "mz_persist_state_rollup_write_success",
1250                help: "count of rollups written successful (may not be linked in to state)",
1251            )),
1252            rollup_write_noop_latest: rollup_write_noop.with_label_values(&["latest"]),
1253            rollup_write_noop_truncated: rollup_write_noop.with_label_values(&["truncated"]),
1254        }
1255    }
1256}
1257
1258#[derive(Debug)]
1259pub struct ShardsMetrics {
1260    // Unlike all the other metrics in here, ShardsMetrics intentionally uses
1261    // the DeleteOnDrop wrappers. A process might stop using a shard (drop all
1262    // handles to it) but e.g. the set of commands never changes.
1263    _count: ComputedIntGauge,
1264    since: mz_ore::metrics::IntGaugeVec,
1265    upper: mz_ore::metrics::IntGaugeVec,
1266    encoded_rollup_size: mz_ore::metrics::UIntGaugeVec,
1267    encoded_diff_size: mz_ore::metrics::IntCounterVec,
1268    hollow_batch_count: mz_ore::metrics::UIntGaugeVec,
1269    spine_batch_count: mz_ore::metrics::UIntGaugeVec,
1270    batch_part_count: mz_ore::metrics::UIntGaugeVec,
1271    batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
1272    batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
1273    update_count: mz_ore::metrics::UIntGaugeVec,
1274    rollup_count: mz_ore::metrics::UIntGaugeVec,
1275    largest_batch_size: mz_ore::metrics::UIntGaugeVec,
1276    seqnos_held: mz_ore::metrics::UIntGaugeVec,
1277    seqnos_since_last_rollup: mz_ore::metrics::UIntGaugeVec,
1278    gc_seqno_held_parts: mz_ore::metrics::UIntGaugeVec,
1279    gc_live_diffs: mz_ore::metrics::UIntGaugeVec,
1280    gc_finished: mz_ore::metrics::IntCounterVec,
1281    compaction_applied: mz_ore::metrics::IntCounterVec,
1282    cmd_succeeded: mz_ore::metrics::IntCounterVec,
1283    usage_current_state_batches_bytes: mz_ore::metrics::UIntGaugeVec,
1284    usage_current_state_rollups_bytes: mz_ore::metrics::UIntGaugeVec,
1285    usage_referenced_not_current_state_bytes: mz_ore::metrics::UIntGaugeVec,
1286    usage_not_leaked_not_referenced_bytes: mz_ore::metrics::UIntGaugeVec,
1287    usage_leaked_bytes: mz_ore::metrics::UIntGaugeVec,
1288    pubsub_push_diff_applied: mz_ore::metrics::IntCounterVec,
1289    pubsub_push_diff_not_applied_stale: mz_ore::metrics::IntCounterVec,
1290    pubsub_push_diff_not_applied_out_of_order: mz_ore::metrics::IntCounterVec,
1291    stale_version: mz_ore::metrics::UIntGaugeVec,
1292    blob_gets: mz_ore::metrics::IntCounterVec,
1293    blob_sets: mz_ore::metrics::IntCounterVec,
1294    live_writers: mz_ore::metrics::UIntGaugeVec,
1295    unconsolidated_snapshot: mz_ore::metrics::IntCounterVec,
1296    backpressure_emitted_bytes: IntCounterVec,
1297    backpressure_last_backpressured_bytes: UIntGaugeVec,
1298    backpressure_retired_bytes: IntCounterVec,
1299    rewrite_part_count: UIntGaugeVec,
1300    inline_part_count: UIntGaugeVec,
1301    inline_part_bytes: UIntGaugeVec,
1302    compact_batches: UIntGaugeVec,
1303    compacting_batches: UIntGaugeVec,
1304    noncompact_batches: UIntGaugeVec,
1305    schema_registry_version_count: UIntGaugeVec,
1306    inline_backpressure_count: IntCounterVec,
1307    // We hand out `Arc<ShardMetrics>` to read and write handles, but store it
1308    // here as `Weak`. This allows us to discover if it's no longer in use and
1309    // so we can remove it from the map.
1310    shards: Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
1311}
1312
1313impl ShardsMetrics {
1314    fn new(registry: &MetricsRegistry) -> Self {
1315        let shards = Arc::new(Mutex::new(BTreeMap::new()));
1316        let shards_count = Arc::clone(&shards);
1317        ShardsMetrics {
1318            _count: registry.register_computed_gauge(
1319                metric!(
1320                    name: "mz_persist_shard_count",
1321                    help: "count of all active shards on this process",
1322                ),
1323                move || {
1324                    let mut ret = 0;
1325                    Self::compute(&shards_count, |_m| ret += 1);
1326                    ret
1327                },
1328            ),
1329            since: registry.register(metric!(
1330                name: "mz_persist_shard_since",
1331                help: "since by shard",
1332                var_labels: ["shard", "name"],
1333            )),
1334            upper: registry.register(metric!(
1335                name: "mz_persist_shard_upper",
1336                help: "upper by shard",
1337                var_labels: ["shard", "name"],
1338            )),
1339            encoded_rollup_size: registry.register(metric!(
1340                name: "mz_persist_shard_rollup_size_bytes",
1341                help: "total encoded rollup size by shard",
1342                var_labels: ["shard", "name"],
1343            )),
1344            encoded_diff_size: registry.register(metric!(
1345                name: "mz_persist_shard_diff_size_bytes",
1346                help: "total encoded diff size by shard",
1347                var_labels: ["shard", "name"],
1348            )),
1349            hollow_batch_count: registry.register(metric!(
1350                name: "mz_persist_shard_hollow_batch_count",
1351                help: "count of hollow batches by shard",
1352                var_labels: ["shard", "name"],
1353            )),
1354            spine_batch_count: registry.register(metric!(
1355                name: "mz_persist_shard_spine_batch_count",
1356                help: "count of spine batches by shard",
1357                var_labels: ["shard", "name"],
1358            )),
1359            batch_part_count: registry.register(metric!(
1360                name: "mz_persist_shard_batch_part_count",
1361                help: "count of batch parts by shard",
1362                var_labels: ["shard", "name"],
1363            )),
1364            batch_part_version_count: registry.register(metric!(
1365                name: "mz_persist_shard_batch_part_version_count",
1366                help: "count of batch parts by shard and version",
1367                var_labels: ["shard", "name", "version"],
1368            )),
1369            batch_part_version_bytes: registry.register(metric!(
1370                name: "mz_persist_shard_batch_part_version_bytes",
1371                help: "total bytes in batch parts by shard and version",
1372                var_labels: ["shard", "name", "version"],
1373            )),
1374            update_count: registry.register(metric!(
1375                name: "mz_persist_shard_update_count",
1376                help: "count of updates by shard",
1377                var_labels: ["shard", "name"],
1378            )),
1379            rollup_count: registry.register(metric!(
1380                name: "mz_persist_shard_rollup_count",
1381                help: "count of rollups by shard",
1382                var_labels: ["shard", "name"],
1383            )),
1384            largest_batch_size: registry.register(metric!(
1385                name: "mz_persist_shard_largest_batch_size",
1386                help: "largest encoded batch size by shard",
1387                var_labels: ["shard", "name"],
1388            )),
1389            seqnos_held: registry.register(metric!(
1390                name: "mz_persist_shard_seqnos_held",
1391                help: "maximum count of gc-ineligible states by shard",
1392                var_labels: ["shard", "name"],
1393            )),
1394            seqnos_since_last_rollup: registry.register(metric!(
1395                name: "mz_persist_shard_seqnos_since_last_rollup",
1396                help: "count of seqnos since last rollup",
1397                var_labels: ["shard", "name"],
1398            )),
1399            gc_seqno_held_parts: registry.register(metric!(
1400                name: "mz_persist_shard_gc_seqno_held_parts",
1401                help: "count of parts referenced by some live state but not the current state (ie. parts kept only to satisfy seqno holds) at GC time",
1402                var_labels: ["shard", "name"],
1403            )),
1404            gc_live_diffs: registry.register(metric!(
1405                name: "mz_persist_shard_gc_live_diffs",
1406                help: "the number of diffs (or, alternatively, the number of seqnos) present in consensus state at GC time",
1407                var_labels: ["shard", "name"],
1408            )),
1409            gc_finished: registry.register(metric!(
1410                name: "mz_persist_shard_gc_finished",
1411                help: "count of garbage collections finished by shard",
1412                var_labels: ["shard", "name"],
1413            )),
1414            compaction_applied: registry.register(metric!(
1415                name: "mz_persist_shard_compaction_applied",
1416                help: "count of compactions applied to state by shard",
1417                var_labels: ["shard", "name"],
1418            )),
1419            cmd_succeeded: registry.register(metric!(
1420                name: "mz_persist_shard_cmd_succeeded",
1421                help: "count of commands succeeded by shard",
1422                var_labels: ["shard", "name"],
1423            )),
1424            usage_current_state_batches_bytes: registry.register(metric!(
1425                name: "mz_persist_shard_usage_current_state_batches_bytes",
1426                help: "data in batches/parts referenced by current version of state",
1427                var_labels: ["shard", "name"],
1428            )),
1429            usage_current_state_rollups_bytes: registry.register(metric!(
1430                name: "mz_persist_shard_usage_current_state_rollups_bytes",
1431                help: "data in rollups referenced by current version of state",
1432                var_labels: ["shard", "name"],
1433            )),
1434            usage_referenced_not_current_state_bytes: registry.register(metric!(
1435                name: "mz_persist_shard_usage_referenced_not_current_state_bytes",
1436                help: "data referenced only by a previous version of state",
1437                var_labels: ["shard", "name"],
1438            )),
1439            usage_not_leaked_not_referenced_bytes: registry.register(metric!(
1440                name: "mz_persist_shard_usage_not_leaked_not_referenced_bytes",
1441                help: "data written by an active writer but not referenced by any version of state",
1442                var_labels: ["shard", "name"],
1443            )),
1444            usage_leaked_bytes: registry.register(metric!(
1445                name: "mz_persist_shard_usage_leaked_bytes",
1446                help: "data reclaimable by a leaked blob detector",
1447                var_labels: ["shard", "name"],
1448            )),
1449            pubsub_push_diff_applied: registry.register(metric!(
1450                name: "mz_persist_shard_pubsub_diff_applied",
1451                help: "number of diffs received via pubsub that applied",
1452                var_labels: ["shard", "name"],
1453            )),
1454            pubsub_push_diff_not_applied_stale: registry.register(metric!(
1455                name: "mz_persist_shard_pubsub_diff_not_applied_stale",
1456                help: "number of diffs received via pubsub that did not apply due to staleness",
1457                var_labels: ["shard", "name"],
1458            )),
1459            pubsub_push_diff_not_applied_out_of_order: registry.register(metric!(
1460                name: "mz_persist_shard_pubsub_diff_not_applied_out_of_order",
1461                help: "number of diffs received via pubsub that did not apply due to out-of-order delivery",
1462                var_labels: ["shard", "name"],
1463            )),
1464            stale_version: registry.register(metric!(
1465                name: "mz_persist_shard_stale_version",
1466                help: "indicates whether the current version of the shard is less than the current version of the code",
1467                var_labels: ["shard", "name"],
1468            )),
1469            blob_gets: registry.register(metric!(
1470                name: "mz_persist_shard_blob_gets",
1471                help: "number of Blob::get calls for this shard",
1472                var_labels: ["shard", "name"],
1473            )),
1474            blob_sets: registry.register(metric!(
1475                name: "mz_persist_shard_blob_sets",
1476                help: "number of Blob::set calls for this shard",
1477                var_labels: ["shard", "name"],
1478            )),
1479            live_writers: registry.register(metric!(
1480                name: "mz_persist_shard_live_writers",
1481                help: "number of writers that have recently appended updates to this shard",
1482                var_labels: ["shard", "name"],
1483            )),
1484            unconsolidated_snapshot: registry.register(metric!(
1485                name: "mz_persist_shard_unconsolidated_snapshot",
1486                help: "in snapshot_and_read, the number of times consolidating the raw data wasn't enough to produce consolidated output",
1487                var_labels: ["shard", "name"],
1488            )),
1489            backpressure_emitted_bytes: registry.register(metric!(
1490                name: "mz_persist_backpressure_emitted_bytes",
1491                help: "A counter with the number of emitted bytes.",
1492                var_labels: ["shard", "name"],
1493            )),
1494            backpressure_last_backpressured_bytes: registry.register(metric!(
1495                name: "mz_persist_backpressure_last_backpressured_bytes",
1496                help: "The last count of bytes we are waiting to be retired in \
1497                    the operator. This cannot be directly compared to \
1498                    `retired_bytes`, but CAN indicate that backpressure is happening.",
1499                var_labels: ["shard", "name"],
1500            )),
1501            backpressure_retired_bytes: registry.register(metric!(
1502                name: "mz_persist_backpressure_retired_bytes",
1503                help:"A counter with the number of bytes retired by downstream processing.",
1504                var_labels: ["shard", "name"],
1505            )),
1506            rewrite_part_count: registry.register(metric!(
1507                name: "mz_persist_shard_rewrite_part_count",
1508                help: "count of batch parts with rewrites by shard",
1509                var_labels: ["shard", "name"],
1510            )),
1511            inline_part_count: registry.register(metric!(
1512                name: "mz_persist_shard_inline_part_count",
1513                help: "count of parts inline in shard metadata",
1514                var_labels: ["shard", "name"],
1515            )),
1516            inline_part_bytes: registry.register(metric!(
1517                name: "mz_persist_shard_inline_part_bytes",
1518                help: "total size of parts inline in shard metadata",
1519                var_labels: ["shard", "name"],
1520            )),
1521            compact_batches: registry.register(metric!(
1522                name: "mz_persist_shard_compact_batches",
1523                help: "number of fully compact batches in the shard",
1524                var_labels: ["shard", "name"],
1525            )),
1526            compacting_batches: registry.register(metric!(
1527                name: "mz_persist_shard_compacting_batches",
1528                help: "number of batches in the shard with compactions in progress",
1529                var_labels: ["shard", "name"],
1530            )),
1531            noncompact_batches: registry.register(metric!(
1532                name: "mz_persist_shard_noncompact_batches",
1533                help: "number of batches in the shard that aren't compact and have no ongoing compaction",
1534                var_labels: ["shard", "name"],
1535            )),
1536            schema_registry_version_count: registry.register(metric!(
1537                name: "mz_persist_shard_schema_registry_version_count",
1538                help: "count of versions in the schema registry",
1539                var_labels: ["shard", "name"],
1540            )),
1541            inline_backpressure_count: registry.register(metric!(
1542                name: "mz_persist_shard_inline_backpressure_count",
1543                help: "count of CaA attempts retried because of inline backpressure",
1544                var_labels: ["shard", "name"],
1545            )),
1546            shards,
1547        }
1548    }
1549
1550    pub fn shard(&self, shard_id: &ShardId, name: &str) -> Arc<ShardMetrics> {
1551        let mut shards = self.shards.lock().expect("mutex poisoned");
1552        if let Some(shard) = shards.get(shard_id) {
1553            if let Some(shard) = shard.upgrade() {
1554                return Arc::clone(&shard);
1555            } else {
1556                assert!(shards.remove(shard_id).is_some());
1557            }
1558        }
1559        let shard = Arc::new(ShardMetrics::new(shard_id, name, self));
1560        assert!(
1561            shards
1562                .insert(shard_id.clone(), Arc::downgrade(&shard))
1563                .is_none()
1564        );
1565        shard
1566    }
1567
1568    fn compute<F: FnMut(&ShardMetrics)>(
1569        shards: &Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
1570        mut f: F,
1571    ) {
1572        let mut shards = shards.lock().expect("mutex poisoned");
1573        let mut deleted_shards = Vec::new();
1574        for (shard_id, metrics) in shards.iter() {
1575            if let Some(metrics) = metrics.upgrade() {
1576                f(&metrics);
1577            } else {
1578                deleted_shards.push(shard_id.clone());
1579            }
1580        }
1581        for deleted_shard_id in deleted_shards {
1582            assert!(shards.remove(&deleted_shard_id).is_some());
1583        }
1584    }
1585}
1586
1587#[derive(Debug)]
1588pub struct ShardMetrics {
1589    pub shard_id: ShardId,
1590    pub name: String,
1591    pub since: DeleteOnDropGauge<AtomicI64, Vec<String>>,
1592    pub upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
1593    pub largest_batch_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1594    pub latest_rollup_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1595    pub encoded_diff_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1596    pub hollow_batch_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1597    pub spine_batch_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1598    pub batch_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1599    batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
1600    batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
1601    batch_part_version_map: Mutex<BTreeMap<String, BatchPartVersionMetrics>>,
1602    pub update_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1603    pub rollup_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1604    pub seqnos_held: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1605    pub seqnos_since_last_rollup: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1606    pub gc_seqno_held_parts: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1607    pub gc_live_diffs: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1608    pub usage_current_state_batches_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1609    pub usage_current_state_rollups_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1610    pub usage_referenced_not_current_state_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1611    pub usage_not_leaked_not_referenced_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1612    pub usage_leaked_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1613    pub gc_finished: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1614    pub compaction_applied: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1615    pub cmd_succeeded: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1616    pub pubsub_push_diff_applied: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1617    pub pubsub_push_diff_not_applied_stale: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1618    pub pubsub_push_diff_not_applied_out_of_order: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1619    pub stale_version: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1620    pub blob_gets: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1621    pub blob_sets: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1622    pub live_writers: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1623    pub unconsolidated_snapshot: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1624    pub backpressure_emitted_bytes: Arc<DeleteOnDropCounter<AtomicU64, Vec<String>>>,
1625    pub backpressure_last_backpressured_bytes: Arc<DeleteOnDropGauge<AtomicU64, Vec<String>>>,
1626    pub backpressure_retired_bytes: Arc<DeleteOnDropCounter<AtomicU64, Vec<String>>>,
1627    pub rewrite_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1628    pub inline_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1629    pub inline_part_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1630    pub compact_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1631    pub compacting_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1632    pub noncompact_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1633    pub schema_registry_version_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1634    pub inline_backpressure_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1635}
1636
1637impl ShardMetrics {
1638    pub fn new(shard_id: &ShardId, name: &str, shards_metrics: &ShardsMetrics) -> Self {
1639        let shard = shard_id.to_string();
1640        ShardMetrics {
1641            shard_id: *shard_id,
1642            name: name.to_string(),
1643            since: shards_metrics
1644                .since
1645                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1646            upper: shards_metrics
1647                .upper
1648                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1649            latest_rollup_size: shards_metrics
1650                .encoded_rollup_size
1651                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1652            encoded_diff_size: shards_metrics
1653                .encoded_diff_size
1654                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1655            hollow_batch_count: shards_metrics
1656                .hollow_batch_count
1657                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1658            spine_batch_count: shards_metrics
1659                .spine_batch_count
1660                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1661            batch_part_count: shards_metrics
1662                .batch_part_count
1663                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1664            batch_part_version_count: shards_metrics.batch_part_version_count.clone(),
1665            batch_part_version_bytes: shards_metrics.batch_part_version_bytes.clone(),
1666            batch_part_version_map: Mutex::new(BTreeMap::new()),
1667            update_count: shards_metrics
1668                .update_count
1669                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1670            rollup_count: shards_metrics
1671                .rollup_count
1672                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1673            largest_batch_size: shards_metrics
1674                .largest_batch_size
1675                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1676            seqnos_held: shards_metrics
1677                .seqnos_held
1678                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1679            seqnos_since_last_rollup: shards_metrics
1680                .seqnos_since_last_rollup
1681                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1682            gc_seqno_held_parts: shards_metrics
1683                .gc_seqno_held_parts
1684                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1685            gc_live_diffs: shards_metrics
1686                .gc_live_diffs
1687                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1688            gc_finished: shards_metrics
1689                .gc_finished
1690                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1691            compaction_applied: shards_metrics
1692                .compaction_applied
1693                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1694            cmd_succeeded: shards_metrics
1695                .cmd_succeeded
1696                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1697            usage_current_state_batches_bytes: shards_metrics
1698                .usage_current_state_batches_bytes
1699                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1700            usage_current_state_rollups_bytes: shards_metrics
1701                .usage_current_state_rollups_bytes
1702                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1703            usage_referenced_not_current_state_bytes: shards_metrics
1704                .usage_referenced_not_current_state_bytes
1705                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1706            usage_not_leaked_not_referenced_bytes: shards_metrics
1707                .usage_not_leaked_not_referenced_bytes
1708                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1709            usage_leaked_bytes: shards_metrics
1710                .usage_leaked_bytes
1711                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1712            pubsub_push_diff_applied: shards_metrics
1713                .pubsub_push_diff_applied
1714                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1715            pubsub_push_diff_not_applied_stale: shards_metrics
1716                .pubsub_push_diff_not_applied_stale
1717                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1718            pubsub_push_diff_not_applied_out_of_order: shards_metrics
1719                .pubsub_push_diff_not_applied_out_of_order
1720                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1721            stale_version: shards_metrics
1722                .stale_version
1723                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1724            blob_gets: shards_metrics
1725                .blob_gets
1726                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1727            blob_sets: shards_metrics
1728                .blob_sets
1729                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1730            live_writers: shards_metrics
1731                .live_writers
1732                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1733            unconsolidated_snapshot: shards_metrics
1734                .unconsolidated_snapshot
1735                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1736            backpressure_emitted_bytes: Arc::new(
1737                shards_metrics
1738                    .backpressure_emitted_bytes
1739                    .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1740            ),
1741            backpressure_last_backpressured_bytes: Arc::new(
1742                shards_metrics
1743                    .backpressure_last_backpressured_bytes
1744                    .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1745            ),
1746            backpressure_retired_bytes: Arc::new(
1747                shards_metrics
1748                    .backpressure_retired_bytes
1749                    .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1750            ),
1751            rewrite_part_count: shards_metrics
1752                .rewrite_part_count
1753                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1754            inline_part_count: shards_metrics
1755                .inline_part_count
1756                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1757            inline_part_bytes: shards_metrics
1758                .inline_part_bytes
1759                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1760            compact_batches: shards_metrics
1761                .compact_batches
1762                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1763            compacting_batches: shards_metrics
1764                .compacting_batches
1765                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1766            noncompact_batches: shards_metrics
1767                .noncompact_batches
1768                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1769            schema_registry_version_count: shards_metrics
1770                .schema_registry_version_count
1771                .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1772            inline_backpressure_count: shards_metrics
1773                .inline_backpressure_count
1774                .get_delete_on_drop_metric(vec![shard, name.to_string()]),
1775        }
1776    }
1777
1778    pub fn set_since<T: Codec64>(&self, since: &Antichain<T>) {
1779        self.since.set(encode_ts_metric(since))
1780    }
1781
1782    pub fn set_upper<T: Codec64>(&self, upper: &Antichain<T>) {
1783        self.upper.set(encode_ts_metric(upper))
1784    }
1785
1786    pub(crate) fn set_batch_part_versions<'a>(
1787        &self,
1788        batch_parts_by_version: impl Iterator<Item = (&'a str, usize)>,
1789    ) {
1790        let mut map = self
1791            .batch_part_version_map
1792            .lock()
1793            .expect("mutex should not be poisoned");
1794        // NB: It's a bit sus that the below assumes that no one else is
1795        // concurrently modifying the atomics in the gauges, but we're holding
1796        // the mutex this whole time, so it should be true.
1797
1798        // We want to do this in a way that avoids allocating (e.g. summing up a
1799        // map). First reset everything.
1800        for x in map.values() {
1801            x.batch_part_version_count.set(0);
1802            x.batch_part_version_bytes.set(0);
1803        }
1804
1805        // Then go through the iterator, creating new entries as necessary and
1806        // adding.
1807        for (key, bytes) in batch_parts_by_version {
1808            if !map.contains_key(key) {
1809                map.insert(
1810                    key.to_owned(),
1811                    BatchPartVersionMetrics {
1812                        batch_part_version_count: self
1813                            .batch_part_version_count
1814                            .get_delete_on_drop_metric(vec![
1815                                self.shard_id.to_string(),
1816                                self.name.clone(),
1817                                key.to_owned(),
1818                            ]),
1819                        batch_part_version_bytes: self
1820                            .batch_part_version_bytes
1821                            .get_delete_on_drop_metric(vec![
1822                                self.shard_id.to_string(),
1823                                self.name.clone(),
1824                                key.to_owned(),
1825                            ]),
1826                    },
1827                );
1828            }
1829            let value = map.get(key).expect("inserted above");
1830            value.batch_part_version_count.inc();
1831            value.batch_part_version_bytes.add(u64::cast_from(bytes));
1832        }
1833    }
1834}
1835
1836#[derive(Debug)]
1837pub struct BatchPartVersionMetrics {
1838    pub batch_part_version_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1839    pub batch_part_version_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1840}
1841
1842/// Metrics recorded by audits of persist usage
1843#[derive(Debug)]
1844pub struct UsageAuditMetrics {
1845    /// Size of all batch parts stored in Blob
1846    pub blob_batch_part_bytes: UIntGauge,
1847    /// Count of batch parts stored in Blob
1848    pub blob_batch_part_count: UIntGauge,
1849    /// Size of all state rollups stored in Blob
1850    pub blob_rollup_bytes: UIntGauge,
1851    /// Count of state rollups stored in Blob
1852    pub blob_rollup_count: UIntGauge,
1853    /// Size of Blob
1854    pub blob_bytes: UIntGauge,
1855    /// Count of all blobs
1856    pub blob_count: UIntGauge,
1857    /// Time spent fetching blob metadata
1858    pub step_blob_metadata: Counter,
1859    /// Time spent fetching state versions
1860    pub step_state: Counter,
1861    /// Time spent doing math
1862    pub step_math: Counter,
1863}
1864
1865impl UsageAuditMetrics {
1866    fn new(registry: &MetricsRegistry) -> Self {
1867        let step_timings: CounterVec = registry.register(metric!(
1868                name: "mz_persist_audit_step_seconds",
1869                help: "time spent on individual steps of audit",
1870                var_labels: ["step"],
1871        ));
1872        UsageAuditMetrics {
1873            blob_batch_part_bytes: registry.register(metric!(
1874                name: "mz_persist_audit_blob_batch_part_bytes",
1875                help: "total size of batch parts in blob",
1876            )),
1877            blob_batch_part_count: registry.register(metric!(
1878                name: "mz_persist_audit_blob_batch_part_count",
1879                help: "count of batch parts in blob",
1880            )),
1881            blob_rollup_bytes: registry.register(metric!(
1882                name: "mz_persist_audit_blob_rollup_bytes",
1883                help: "total size of state rollups stored in blob",
1884            )),
1885            blob_rollup_count: registry.register(metric!(
1886                name: "mz_persist_audit_blob_rollup_count",
1887                help: "count of all state rollups in blob",
1888            )),
1889            blob_bytes: registry.register(metric!(
1890                name: "mz_persist_audit_blob_bytes",
1891                help: "total size of blob",
1892            )),
1893            blob_count: registry.register(metric!(
1894                name: "mz_persist_audit_blob_count",
1895                help: "count of all blobs",
1896            )),
1897            step_blob_metadata: step_timings.with_label_values(&["blob_metadata"]),
1898            step_state: step_timings.with_label_values(&["state"]),
1899            step_math: step_timings.with_label_values(&["math"]),
1900        }
1901    }
1902}
1903
1904/// Represents a change in a number of updates kept in a data structure
1905/// (e.g., a buffer length or capacity change).
1906#[derive(Debug)]
1907pub enum UpdateDelta {
1908    /// A negative delta in the number of updates.
1909    Negative(u64),
1910    /// A non-negative delta in the number of updates.
1911    NonNegative(u64),
1912}
1913
1914impl UpdateDelta {
1915    /// Creates a new `UpdateDelta` from the difference between a new value
1916    /// for a number of updates and the corresponding old value.
1917    pub fn new(new: usize, old: usize) -> Self {
1918        if new < old {
1919            UpdateDelta::Negative(CastFrom::cast_from(old - new))
1920        } else {
1921            UpdateDelta::NonNegative(CastFrom::cast_from(new - old))
1922        }
1923    }
1924}
1925
1926/// Metrics for the persist sink. (While this lies slightly outside the usual
1927/// abstraction boundary of the client, it's convenient to manage them together.
1928#[derive(Debug, Clone)]
1929pub struct SinkMetrics {
1930    /// Cumulative record insertions made to the correction buffer across workers
1931    correction_insertions_total: IntCounter,
1932    /// Cumulative record deletions made to the correction buffer across workers
1933    correction_deletions_total: IntCounter,
1934    /// Cumulative capacity increases made to the correction buffer across workers
1935    correction_capacity_increases_total: IntCounter,
1936    /// Cumulative capacity decreases made to the correction buffer across workers
1937    correction_capacity_decreases_total: IntCounter,
1938    /// Maximum length observed for any one correction buffer per worker
1939    correction_max_per_sink_worker_len_updates: raw::UIntGaugeVec,
1940    /// Maximum capacity observed for any one correction buffer per worker
1941    correction_max_per_sink_worker_capacity_updates: raw::UIntGaugeVec,
1942}
1943
1944impl SinkMetrics {
1945    fn new(registry: &MetricsRegistry) -> Self {
1946        SinkMetrics {
1947            correction_insertions_total: registry.register(metric!(
1948                name: "mz_persist_sink_correction_insertions_total",
1949                help: "The cumulative insertions observed on the correction buffer across workers and persist sinks.",
1950            )),
1951            correction_deletions_total: registry.register(metric!(
1952                name: "mz_persist_sink_correction_deletions_total",
1953                help: "The cumulative deletions observed on the correction buffer across workers and persist sinks.",
1954            )),
1955            correction_capacity_increases_total: registry.register(metric!(
1956                name: "mz_persist_sink_correction_capacity_increases_total",
1957                help: "The cumulative capacity increases observed on the correction buffer across workers and persist sinks.",
1958            )),
1959            correction_capacity_decreases_total: registry.register(metric!(
1960                name: "mz_persist_sink_correction_capacity_decreases_total",
1961                help: "The cumulative capacity decreases observed on the correction buffer across workers and persist sinks.",
1962            )),
1963            correction_max_per_sink_worker_len_updates: registry.register(metric!(
1964                name: "mz_persist_sink_correction_max_per_sink_worker_len_updates",
1965                help: "The maximum length observed for the correction buffer of any single persist sink per worker.",
1966                var_labels: ["worker_id"],
1967            )),
1968            correction_max_per_sink_worker_capacity_updates: registry.register(metric!(
1969                name: "mz_persist_sink_correction_max_per_sink_worker_capacity_updates",
1970                help: "The maximum capacity observed for the correction buffer of any single persist sink per worker.",
1971                var_labels: ["worker_id"],
1972            )),
1973        }
1974    }
1975
1976    /// Obtains a `SinkWorkerMetrics` instance, which allows for metric reporting
1977    /// from a specific `persist_sink` instance for a given worker. The reports will
1978    /// update metrics shared across workers, but provide per-worker contributions
1979    /// to them.
1980    pub fn for_worker(&self, worker_id: usize) -> SinkWorkerMetrics {
1981        let worker = worker_id.to_string();
1982        let correction_max_per_sink_worker_len_updates = self
1983            .correction_max_per_sink_worker_len_updates
1984            .with_label_values(&[&worker]);
1985        let correction_max_per_sink_worker_capacity_updates = self
1986            .correction_max_per_sink_worker_capacity_updates
1987            .with_label_values(&[&worker]);
1988        SinkWorkerMetrics {
1989            correction_max_per_sink_worker_len_updates,
1990            correction_max_per_sink_worker_capacity_updates,
1991        }
1992    }
1993
1994    /// Reports updates to the length and capacity of the correction buffer in the
1995    /// `write_batches` operator of a `persist_sink`.
1996    ///
1997    /// This method updates monotonic metrics based on the deltas and thus can be
1998    /// called across workers and instances of `persist_sink`.
1999    pub fn report_correction_update_deltas(
2000        &self,
2001        correction_len_delta: UpdateDelta,
2002        correction_cap_delta: UpdateDelta,
2003    ) {
2004        // Report insertions or deletions.
2005        match correction_len_delta {
2006            UpdateDelta::NonNegative(delta) => {
2007                if delta > 0 {
2008                    self.correction_insertions_total.inc_by(delta)
2009                }
2010            }
2011            UpdateDelta::Negative(delta) => self.correction_deletions_total.inc_by(delta),
2012        }
2013        // Report capacity increases or decreases.
2014        match correction_cap_delta {
2015            UpdateDelta::NonNegative(delta) => {
2016                if delta > 0 {
2017                    self.correction_capacity_increases_total.inc_by(delta)
2018                }
2019            }
2020            UpdateDelta::Negative(delta) => self.correction_capacity_decreases_total.inc_by(delta),
2021        }
2022    }
2023}
2024
2025/// Metrics for the persist sink that are labeled per-worker.
2026#[derive(Clone, Debug)]
2027pub struct SinkWorkerMetrics {
2028    correction_max_per_sink_worker_len_updates: UIntGauge,
2029    correction_max_per_sink_worker_capacity_updates: UIntGauge,
2030}
2031
2032impl SinkWorkerMetrics {
2033    /// Reports the length and capacity of the correction buffer in the `write_batches`
2034    /// operator of `persist_sink`.
2035    ///
2036    /// This method is used to update metrics that are kept per worker.
2037    pub fn report_correction_update_totals(&self, correction_len: usize, correction_cap: usize) {
2038        // Maintain per-worker peaks.
2039        let correction_len = CastFrom::cast_from(correction_len);
2040        if correction_len > self.correction_max_per_sink_worker_len_updates.get() {
2041            self.correction_max_per_sink_worker_len_updates
2042                .set(correction_len);
2043        }
2044        let correction_cap = CastFrom::cast_from(correction_cap);
2045        if correction_cap > self.correction_max_per_sink_worker_capacity_updates.get() {
2046            self.correction_max_per_sink_worker_capacity_updates
2047                .set(correction_cap);
2048        }
2049    }
2050}
2051
2052/// A minimal set of metrics imported into honeycomb for alerting.
2053#[derive(Debug)]
2054pub struct AlertsMetrics {
2055    pub(crate) blob_failures: IntCounter,
2056    pub(crate) consensus_failures: IntCounter,
2057}
2058
2059impl AlertsMetrics {
2060    fn new(registry: &MetricsRegistry) -> Self {
2061        AlertsMetrics {
2062            blob_failures: registry.register(metric!(
2063                name: "mz_persist_blob_failures",
2064                help: "count of all blob operation failures",
2065                const_labels: {"honeycomb" => "import"},
2066            )),
2067            consensus_failures: registry.register(metric!(
2068                name: "mz_persist_consensus_failures",
2069                help: "count of determinate consensus operation failures",
2070                const_labels: {"honeycomb" => "import"},
2071            )),
2072        }
2073    }
2074}
2075
2076/// Metrics for the PubSubServer implementation.
2077#[derive(Debug)]
2078pub struct PubSubServerMetrics {
2079    pub(crate) active_connections: UIntGauge,
2080    pub(crate) broadcasted_diff_count: IntCounter,
2081    pub(crate) broadcasted_diff_bytes: IntCounter,
2082    pub(crate) broadcasted_diff_dropped_channel_full: IntCounter,
2083
2084    pub(crate) push_seconds: Counter,
2085    pub(crate) subscribe_seconds: Counter,
2086    pub(crate) unsubscribe_seconds: Counter,
2087    pub(crate) connection_cleanup_seconds: Counter,
2088
2089    pub(crate) push_call_count: IntCounter,
2090    pub(crate) subscribe_call_count: IntCounter,
2091    pub(crate) unsubscribe_call_count: IntCounter,
2092}
2093
2094impl PubSubServerMetrics {
2095    pub(crate) fn new(registry: &MetricsRegistry) -> Self {
2096        let op_timings: CounterVec = registry.register(metric!(
2097                name: "mz_persist_pubsub_server_operation_seconds",
2098                help: "time spent in pubsub server performing each operation",
2099                var_labels: ["op"],
2100        ));
2101        let call_count: IntCounterVec = registry.register(metric!(
2102                name: "mz_persist_pubsub_server_call_count",
2103                help: "count of each pubsub server message received",
2104                var_labels: ["call"],
2105        ));
2106
2107        Self {
2108            active_connections: registry.register(metric!(
2109                    name: "mz_persist_pubsub_server_active_connections",
2110                    help: "number of active connections to server",
2111            )),
2112            broadcasted_diff_count: registry.register(metric!(
2113                    name: "mz_persist_pubsub_server_broadcasted_diff_count",
2114                    help: "count of total broadcast diff messages sent",
2115            )),
2116            broadcasted_diff_bytes: registry.register(metric!(
2117                    name: "mz_persist_pubsub_server_broadcasted_diff_bytes",
2118                    help: "count of total broadcast diff bytes sent",
2119            )),
2120            broadcasted_diff_dropped_channel_full: registry.register(metric!(
2121                    name: "mz_persist_pubsub_server_broadcasted_diff_dropped_channel_full",
2122                    help: "count of diffs dropped due to full connection channel",
2123            )),
2124
2125            push_seconds: op_timings.with_label_values(&["push"]),
2126            subscribe_seconds: op_timings.with_label_values(&["subscribe"]),
2127            unsubscribe_seconds: op_timings.with_label_values(&["unsubscribe"]),
2128            connection_cleanup_seconds: op_timings.with_label_values(&["cleanup"]),
2129
2130            push_call_count: call_count.with_label_values(&["push"]),
2131            subscribe_call_count: call_count.with_label_values(&["subscribe"]),
2132            unsubscribe_call_count: call_count.with_label_values(&["unsubscribe"]),
2133        }
2134    }
2135}
2136
2137/// Metrics for the PubSubClient implementation.
2138#[derive(Debug)]
2139pub struct PubSubClientMetrics {
2140    pub sender: PubSubClientSenderMetrics,
2141    pub receiver: PubSubClientReceiverMetrics,
2142    pub grpc_connection: PubSubGrpcClientConnectionMetrics,
2143}
2144
2145impl PubSubClientMetrics {
2146    fn new(registry: &MetricsRegistry) -> Self {
2147        PubSubClientMetrics {
2148            sender: PubSubClientSenderMetrics::new(registry),
2149            receiver: PubSubClientReceiverMetrics::new(registry),
2150            grpc_connection: PubSubGrpcClientConnectionMetrics::new(registry),
2151        }
2152    }
2153}
2154
2155#[derive(Debug)]
2156pub struct PubSubGrpcClientConnectionMetrics {
2157    pub(crate) connected: UIntGauge,
2158    pub(crate) connection_established_count: IntCounter,
2159    pub(crate) connect_call_attempt_count: IntCounter,
2160    pub(crate) broadcast_recv_lagged_count: IntCounter,
2161    pub(crate) grpc_error_count: IntCounter,
2162}
2163
2164impl PubSubGrpcClientConnectionMetrics {
2165    fn new(registry: &MetricsRegistry) -> Self {
2166        Self {
2167            connected: registry.register(metric!(
2168                    name: "mz_persist_pubsub_client_grpc_connected",
2169                    help: "whether the grpc client is currently connected",
2170            )),
2171            connection_established_count: registry.register(metric!(
2172                    name: "mz_persist_pubsub_client_grpc_connection_established_count",
2173                    help: "count of grpc connection establishments to pubsub server",
2174            )),
2175            connect_call_attempt_count: registry.register(metric!(
2176                    name: "mz_persist_pubsub_client_grpc_connect_call_attempt_count",
2177                    help: "count of connection call attempts (including retries) to pubsub server",
2178            )),
2179            broadcast_recv_lagged_count: registry.register(metric!(
2180                    name: "mz_persist_pubsub_client_grpc_broadcast_recv_lagged_count",
2181                    help: "times a message was missed by broadcast receiver due to lag",
2182            )),
2183            grpc_error_count: registry.register(metric!(
2184                    name: "mz_persist_pubsub_client_grpc_error_count",
2185                    help: "count of grpc errors received",
2186            )),
2187        }
2188    }
2189}
2190
2191#[derive(Clone, Debug)]
2192pub struct PubSubClientReceiverMetrics {
2193    pub(crate) push_received: IntCounter,
2194    pub(crate) unknown_message_received: IntCounter,
2195    pub(crate) approx_diff_latency_seconds: Histogram,
2196
2197    pub(crate) state_pushed_diff_fast_path: IntCounter,
2198    pub(crate) state_pushed_diff_slow_path_succeeded: IntCounter,
2199    pub(crate) state_pushed_diff_slow_path_failed: IntCounter,
2200}
2201
2202impl PubSubClientReceiverMetrics {
2203    fn new(registry: &MetricsRegistry) -> Self {
2204        let call_received: IntCounterVec = registry.register(metric!(
2205                name: "mz_persist_pubsub_client_call_received",
2206                help: "times a pubsub client call was received",
2207                var_labels: ["call"],
2208        ));
2209
2210        Self {
2211            push_received: call_received.with_label_values(&["push"]),
2212            unknown_message_received: call_received.with_label_values(&["unknown"]),
2213            approx_diff_latency_seconds: registry.register(metric!(
2214                name: "mz_persist_pubsub_client_approx_diff_apply_latency_seconds",
2215                help: "histogram of (approximate) latency between sending a diff and applying it",
2216                buckets: prometheus::exponential_buckets(0.001, 2.0, 13).expect("buckets"),
2217            )),
2218
2219            state_pushed_diff_fast_path: registry.register(metric!(
2220                name: "mz_persist_pubsub_client_receiver_state_push_diff_fast_path",
2221                help: "count fast-path state push_diff calls",
2222            )),
2223            state_pushed_diff_slow_path_succeeded: registry.register(metric!(
2224                name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_succeeded",
2225                help: "count of successful slow-path state push_diff calls",
2226            )),
2227            state_pushed_diff_slow_path_failed: registry.register(metric!(
2228                name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_failed",
2229                help: "count of unsuccessful slow-path state push_diff calls",
2230            )),
2231        }
2232    }
2233}
2234
2235#[derive(Debug)]
2236pub struct PubSubClientSenderMetrics {
2237    pub push: PubSubClientCallMetrics,
2238    pub subscribe: PubSubClientCallMetrics,
2239    pub unsubscribe: PubSubClientCallMetrics,
2240}
2241
2242#[derive(Debug)]
2243pub struct PubSubClientCallMetrics {
2244    pub(crate) succeeded: IntCounter,
2245    pub(crate) bytes_sent: IntCounter,
2246    pub(crate) failed: IntCounter,
2247}
2248
2249impl PubSubClientSenderMetrics {
2250    fn new(registry: &MetricsRegistry) -> Self {
2251        let call_bytes_sent: IntCounterVec = registry.register(metric!(
2252                name: "mz_persist_pubsub_client_call_bytes_sent",
2253                help: "number of bytes sent for a given pubsub client call",
2254                var_labels: ["call"],
2255        ));
2256        let call_succeeded: IntCounterVec = registry.register(metric!(
2257                name: "mz_persist_pubsub_client_call_succeeded",
2258                help: "times a pubsub client call succeeded",
2259                var_labels: ["call"],
2260        ));
2261        let call_failed: IntCounterVec = registry.register(metric!(
2262                name: "mz_persist_pubsub_client_call_failed",
2263                help: "times a pubsub client call failed",
2264                var_labels: ["call"],
2265        ));
2266
2267        Self {
2268            push: PubSubClientCallMetrics {
2269                succeeded: call_succeeded.with_label_values(&["push"]),
2270                failed: call_failed.with_label_values(&["push"]),
2271                bytes_sent: call_bytes_sent.with_label_values(&["push"]),
2272            },
2273            subscribe: PubSubClientCallMetrics {
2274                succeeded: call_succeeded.with_label_values(&["subscribe"]),
2275                failed: call_failed.with_label_values(&["subscribe"]),
2276                bytes_sent: call_bytes_sent.with_label_values(&["subscribe"]),
2277            },
2278            unsubscribe: PubSubClientCallMetrics {
2279                succeeded: call_succeeded.with_label_values(&["unsubscribe"]),
2280                failed: call_failed.with_label_values(&["unsubscribe"]),
2281                bytes_sent: call_bytes_sent.with_label_values(&["unsubscribe"]),
2282            },
2283        }
2284    }
2285}
2286
2287#[derive(Debug)]
2288pub struct LocksMetrics {
2289    pub(crate) applier_read_cacheable: LockMetrics,
2290    pub(crate) applier_read_noncacheable: LockMetrics,
2291    pub(crate) applier_write: LockMetrics,
2292    pub(crate) watch: LockMetrics,
2293}
2294
2295#[derive(Debug, Clone)]
2296pub struct LockMetrics {
2297    pub(crate) acquire_count: IntCounter,
2298    pub(crate) blocking_acquire_count: IntCounter,
2299    pub(crate) blocking_seconds: Counter,
2300}
2301
2302#[derive(Debug)]
2303pub struct WatchMetrics {
2304    pub(crate) listen_woken_via_watch: IntCounter,
2305    pub(crate) listen_woken_via_sleep: IntCounter,
2306    pub(crate) listen_resolved_via_watch: IntCounter,
2307    pub(crate) listen_resolved_via_sleep: IntCounter,
2308    pub(crate) snapshot_woken_via_watch: IntCounter,
2309    pub(crate) snapshot_woken_via_sleep: IntCounter,
2310    pub(crate) notify_sent: IntCounter,
2311    pub(crate) notify_noop: IntCounter,
2312    pub(crate) notify_recv: IntCounter,
2313    pub(crate) notify_lagged: IntCounter,
2314    pub(crate) notify_wait_started: IntCounter,
2315    pub(crate) notify_wait_finished: IntCounter,
2316}
2317
2318impl WatchMetrics {
2319    fn new(registry: &MetricsRegistry) -> Self {
2320        WatchMetrics {
2321            listen_woken_via_watch: registry.register(metric!(
2322                name: "mz_persist_listen_woken_via_watch",
2323                help: "count of listen next batches wakes via watch notify",
2324            )),
2325            listen_woken_via_sleep: registry.register(metric!(
2326                name: "mz_persist_listen_woken_via_sleep",
2327                help: "count of listen next batches wakes via sleep",
2328            )),
2329            listen_resolved_via_watch: registry.register(metric!(
2330                name: "mz_persist_listen_resolved_via_watch",
2331                help: "count of listen next batches resolved via watch notify",
2332            )),
2333            listen_resolved_via_sleep: registry.register(metric!(
2334                name: "mz_persist_listen_resolved_via_sleep",
2335                help: "count of listen next batches resolved via sleep",
2336            )),
2337            snapshot_woken_via_watch: registry.register(metric!(
2338                name: "mz_persist_snapshot_woken_via_watch",
2339                help: "count of snapshot wakes via watch notify",
2340            )),
2341            snapshot_woken_via_sleep: registry.register(metric!(
2342                name: "mz_persist_snapshot_woken_via_sleep",
2343                help: "count of snapshot wakes via sleep",
2344            )),
2345            notify_sent: registry.register(metric!(
2346                name: "mz_persist_watch_notify_sent",
2347                help: "count of watch notifications sent to a non-empty broadcast channel",
2348            )),
2349            notify_noop: registry.register(metric!(
2350                name: "mz_persist_watch_notify_noop",
2351                help: "count of watch notifications sent to an broadcast channel",
2352            )),
2353            notify_recv: registry.register(metric!(
2354                name: "mz_persist_watch_notify_recv",
2355                help: "count of watch notifications received from the broadcast channel",
2356            )),
2357            notify_lagged: registry.register(metric!(
2358                name: "mz_persist_watch_notify_lagged",
2359                help: "count of lagged events in the watch notification broadcast channel",
2360            )),
2361            notify_wait_started: registry.register(metric!(
2362                name: "mz_persist_watch_notify_wait_started",
2363                help: "count of watch wait calls started",
2364            )),
2365            notify_wait_finished: registry.register(metric!(
2366                name: "mz_persist_watch_notify_wait_finished",
2367                help: "count of watch wait calls resolved",
2368            )),
2369        }
2370    }
2371}
2372
2373#[derive(Debug)]
2374pub struct PushdownMetrics {
2375    pub(crate) parts_filtered_count: IntCounter,
2376    pub(crate) parts_filtered_bytes: IntCounter,
2377    pub(crate) parts_fetched_count: IntCounter,
2378    pub(crate) parts_fetched_bytes: IntCounter,
2379    pub(crate) parts_audited_count: IntCounter,
2380    pub(crate) parts_audited_bytes: IntCounter,
2381    pub(crate) parts_inline_count: IntCounter,
2382    pub(crate) parts_inline_bytes: IntCounter,
2383    pub(crate) parts_faked_count: IntCounter,
2384    pub(crate) parts_faked_bytes: IntCounter,
2385    pub(crate) parts_stats_trimmed_count: IntCounter,
2386    pub(crate) parts_stats_trimmed_bytes: IntCounter,
2387    pub(crate) parts_projection_trimmed_bytes: IntCounter,
2388    pub part_stats: PartStatsMetrics,
2389}
2390
2391impl PushdownMetrics {
2392    fn new(registry: &MetricsRegistry) -> Self {
2393        PushdownMetrics {
2394            parts_filtered_count: registry.register(metric!(
2395                name: "mz_persist_pushdown_parts_filtered_count",
2396                help: "count of parts filtered by pushdown",
2397            )),
2398            parts_filtered_bytes: registry.register(metric!(
2399                name: "mz_persist_pushdown_parts_filtered_bytes",
2400                help: "total size of parts filtered by pushdown in bytes",
2401            )),
2402            parts_fetched_count: registry.register(metric!(
2403                name: "mz_persist_pushdown_parts_fetched_count",
2404                help: "count of parts not filtered by pushdown",
2405            )),
2406            parts_fetched_bytes: registry.register(metric!(
2407                name: "mz_persist_pushdown_parts_fetched_bytes",
2408                help: "total size of parts not filtered by pushdown in bytes",
2409            )),
2410            parts_audited_count: registry.register(metric!(
2411                name: "mz_persist_pushdown_parts_audited_count",
2412                help: "count of parts fetched only for pushdown audit",
2413            )),
2414            parts_audited_bytes: registry.register(metric!(
2415                name: "mz_persist_pushdown_parts_audited_bytes",
2416                help: "total size of parts fetched only for pushdown audit",
2417            )),
2418            parts_inline_count: registry.register(metric!(
2419                name: "mz_persist_pushdown_parts_inline_count",
2420                help: "count of parts not fetched because they were inline",
2421            )),
2422            parts_inline_bytes: registry.register(metric!(
2423                name: "mz_persist_pushdown_parts_inline_bytes",
2424                help: "total size of parts not fetched because they were inline",
2425            )),
2426            parts_faked_count: registry.register(metric!(
2427                name: "mz_persist_pushdown_parts_faked_count",
2428                help: "count of parts faked because of aggressive projection pushdown",
2429            )),
2430            parts_faked_bytes: registry.register(metric!(
2431                name: "mz_persist_pushdown_parts_faked_bytes",
2432                help: "total size of parts replaced with fakes by aggressive projection pushdown",
2433            )),
2434            parts_stats_trimmed_count: registry.register(metric!(
2435                name: "mz_persist_pushdown_parts_stats_trimmed_count",
2436                help: "count of trimmed part stats",
2437            )),
2438            parts_stats_trimmed_bytes: registry.register(metric!(
2439                name: "mz_persist_pushdown_parts_stats_trimmed_bytes",
2440                help: "total bytes trimmed from part stats",
2441            )),
2442            parts_projection_trimmed_bytes: registry.register(metric!(
2443                name: "mz_persist_pushdown_parts_projection_trimmed_bytes",
2444                help: "total bytes trimmed from columnar data because of projection pushdown",
2445            )),
2446            part_stats: PartStatsMetrics::new(registry),
2447        }
2448    }
2449}
2450
2451#[derive(Debug)]
2452pub struct ConsolidationMetrics {
2453    pub(crate) parts_fetched: IntCounter,
2454    pub(crate) parts_skipped: IntCounter,
2455    pub(crate) parts_wasted: IntCounter,
2456    pub(crate) wrong_sort: IntCounter,
2457}
2458
2459impl ConsolidationMetrics {
2460    fn new(registry: &MetricsRegistry) -> Self {
2461        ConsolidationMetrics {
2462            parts_fetched: registry.register(metric!(
2463                name: "mz_persist_consolidation_parts_fetched_count",
2464                help: "count of parts that were fetched and used during consolidation",
2465            )),
2466            parts_skipped: registry.register(metric!(
2467                name: "mz_persist_consolidation_parts_skipped_count",
2468                help: "count of parts that were never needed during consolidation",
2469            )),
2470            parts_wasted: registry.register(metric!(
2471                name: "mz_persist_consolidation_parts_wasted_count",
2472                help: "count of parts that were fetched but not needed during consolidation",
2473            )),
2474            wrong_sort: registry.register(metric!(
2475                name: "mz_persist_consolidation_wrong_sort_count",
2476                help: "count of runs that were sorted using the wrong ordering for the current consolidation",
2477            )),
2478        }
2479    }
2480}
2481
2482#[derive(Debug)]
2483pub struct BlobMemCache {
2484    pub(crate) size_blobs: UIntGauge,
2485    pub(crate) size_bytes: UIntGauge,
2486    pub(crate) hits_blobs: IntCounter,
2487    pub(crate) hits_bytes: IntCounter,
2488    pub(crate) evictions: IntCounter,
2489}
2490
2491impl BlobMemCache {
2492    fn new(registry: &MetricsRegistry) -> Self {
2493        BlobMemCache {
2494            size_blobs: registry.register(metric!(
2495                name: "mz_persist_blob_cache_size_blobs",
2496                help: "count of blobs in the cache",
2497                const_labels: {"cache" => "mem"},
2498            )),
2499            size_bytes: registry.register(metric!(
2500                name: "mz_persist_blob_cache_size_bytes",
2501                help: "total size of blobs in the cache",
2502                const_labels: {"cache" => "mem"},
2503            )),
2504            hits_blobs: registry.register(metric!(
2505                name: "mz_persist_blob_cache_hits_blobs",
2506                help: "count of blobs served via cache instead of s3",
2507                const_labels: {"cache" => "mem"},
2508            )),
2509            hits_bytes: registry.register(metric!(
2510                name: "mz_persist_blob_cache_hits_bytes",
2511                help: "total size of blobs served via cache instead of s3",
2512                const_labels: {"cache" => "mem"},
2513            )),
2514            evictions: registry.register(metric!(
2515                name: "mz_persist_blob_cache_evictions",
2516                help: "count of capacity-based cache evictions",
2517                const_labels: {"cache" => "mem"},
2518            )),
2519        }
2520    }
2521}
2522
2523#[derive(Debug)]
2524pub struct SemaphoreMetrics {
2525    cfg: PersistConfig,
2526    registry: MetricsRegistry,
2527    fetch: OnceCell<MetricsSemaphore>,
2528}
2529
2530impl SemaphoreMetrics {
2531    fn new(cfg: PersistConfig, registry: MetricsRegistry) -> Self {
2532        SemaphoreMetrics {
2533            cfg,
2534            registry,
2535            fetch: OnceCell::new(),
2536        }
2537    }
2538
2539    /// We can't easily change the number of permits, and the dyncfgs are all
2540    /// set to defaults on process start, so make sure we only initialize the
2541    /// semaphore once we've synced dyncfgs at least once.
2542    async fn fetch(&self) -> &MetricsSemaphore {
2543        if let Some(x) = self.fetch.get() {
2544            // Common case of already initialized avoids the cloning below.
2545            return x;
2546        }
2547        let cfg = self.cfg.clone();
2548        let registry = self.registry.clone();
2549        let init = async move {
2550            let total_permits = match cfg.announce_memory_limit {
2551                // Non-cc replicas have the old physical flow control mechanism,
2552                // so only apply this one on cc replicas.
2553                Some(mem) if cfg.is_cc_active => {
2554                    // We can't easily adjust the number of permits later, so
2555                    // make sure we've synced dyncfg values at least once.
2556                    info!("fetch semaphore awaiting first dyncfg values");
2557                    let () = cfg.configs_synced_once().await;
2558                    let total_permits = usize::cast_lossy(
2559                        f64::cast_lossy(mem) * FETCH_SEMAPHORE_PERMIT_ADJUSTMENT.get(&cfg),
2560                    );
2561                    info!("fetch_semaphore got first dyncfg values");
2562                    total_permits
2563                }
2564                Some(_) | None => Semaphore::MAX_PERMITS,
2565            };
2566            MetricsSemaphore::new(&registry, "fetch", total_permits)
2567        };
2568        self.fetch.get_or_init(|| init).await
2569    }
2570
2571    pub(crate) async fn acquire_fetch_permits(&self, encoded_size_bytes: usize) -> MetricsPermits {
2572        // Adjust the requested permits to account for the difference between
2573        // encoded_size_bytes and the decoded size in lgalloc.
2574        let requested_permits = f64::cast_lossy(encoded_size_bytes);
2575        let requested_permits = requested_permits * FETCH_SEMAPHORE_COST_ADJUSTMENT.get(&self.cfg);
2576        let requested_permits = usize::cast_lossy(requested_permits);
2577        self.fetch().await.acquire_permits(requested_permits).await
2578    }
2579}
2580
2581#[derive(Debug)]
2582pub struct MetricsSemaphore {
2583    name: &'static str,
2584    semaphore: Arc<Semaphore>,
2585    total_permits: usize,
2586    acquire_count: IntCounter,
2587    blocking_count: IntCounter,
2588    blocking_seconds: Counter,
2589    acquired_permits: IntCounter,
2590    released_permits: IntCounter,
2591    _available_permits: ComputedUIntGauge,
2592}
2593
2594impl MetricsSemaphore {
2595    pub fn new(registry: &MetricsRegistry, name: &'static str, total_permits: usize) -> Self {
2596        let total_permits = std::cmp::min(total_permits, Semaphore::MAX_PERMITS);
2597        // TODO: Sadly, tokio::sync::Semaphore makes it difficult to have a
2598        // dynamic total_permits count.
2599        let semaphore = Arc::new(Semaphore::new(total_permits));
2600        MetricsSemaphore {
2601            name,
2602            total_permits,
2603            acquire_count: registry.register(metric!(
2604                name: "mz_persist_semaphore_acquire_count",
2605                help: "count of acquire calls (not acquired permits count)",
2606                const_labels: {"name" => name},
2607            )),
2608            blocking_count: registry.register(metric!(
2609                name: "mz_persist_semaphore_blocking_count",
2610                help: "count of acquire calls that had to block",
2611                const_labels: {"name" => name},
2612            )),
2613            blocking_seconds: registry.register(metric!(
2614                name: "mz_persist_semaphore_blocking_seconds",
2615                help: "total time spent blocking on permit acquisition",
2616                const_labels: {"name" => name},
2617            )),
2618            acquired_permits: registry.register(metric!(
2619                name: "mz_persist_semaphore_acquired_permits",
2620                help: "total sum of acquired permits",
2621                const_labels: {"name" => name},
2622            )),
2623            released_permits: registry.register(metric!(
2624                name: "mz_persist_semaphore_released_permits",
2625                help: "total sum of released permits",
2626                const_labels: {"name" => name},
2627            )),
2628            _available_permits: registry.register_computed_gauge(
2629                metric!(
2630                    name: "mz_persist_semaphore_available_permits",
2631                    help: "currently available permits according to the semaphore",
2632                ),
2633                {
2634                    let semaphore = Arc::clone(&semaphore);
2635                    move || u64::cast_from(semaphore.available_permits())
2636                },
2637            ),
2638            semaphore,
2639        }
2640    }
2641
2642    pub async fn acquire_permits(&self, requested_permits: usize) -> MetricsPermits {
2643        // HACK: Cap the request at the total permit count. This prevents
2644        // deadlock, even if the cfg gets set to some small value.
2645        let total_permits = u32::try_from(self.total_permits).unwrap_or(u32::MAX);
2646        let requested_permits = u32::try_from(requested_permits).unwrap_or(u32::MAX);
2647        let requested_permits = std::cmp::min(requested_permits, total_permits);
2648        let wrap = |_permit| {
2649            self.acquired_permits.inc_by(u64::from(requested_permits));
2650            MetricsPermits {
2651                _permit,
2652                released_metric: self.released_permits.clone(),
2653                count: requested_permits,
2654            }
2655        };
2656
2657        // Special-case non-blocking happy path.
2658        self.acquire_count.inc();
2659        match Arc::clone(&self.semaphore).try_acquire_many_owned(requested_permits) {
2660            Ok(x) => return wrap(x),
2661            Err(_) => {}
2662        };
2663
2664        // Sad path, gotta block.
2665        self.blocking_count.inc();
2666        let start = Instant::now();
2667        let ret = Arc::clone(&self.semaphore)
2668            .acquire_many_owned(requested_permits)
2669            .instrument(info_span!("acquire_permits"))
2670            .await;
2671        let elapsed = start.elapsed();
2672        self.blocking_seconds.inc_by(elapsed.as_secs_f64());
2673        debug!(
2674            "acquisition of {} {} permits blocked for {:?}",
2675            self.name, requested_permits, elapsed
2676        );
2677        wrap(ret.expect("semaphore is never closed"))
2678    }
2679}
2680
2681#[derive(Debug)]
2682pub struct MetricsPermits {
2683    _permit: OwnedSemaphorePermit,
2684    released_metric: IntCounter,
2685    count: u32,
2686}
2687
2688impl Drop for MetricsPermits {
2689    fn drop(&mut self) {
2690        self.released_metric.inc_by(u64::from(self.count))
2691    }
2692}
2693
2694#[derive(Debug)]
2695pub struct ExternalOpMetrics {
2696    started: IntCounter,
2697    succeeded: IntCounter,
2698    failed: IntCounter,
2699    bytes: IntCounter,
2700    seconds: Counter,
2701    seconds_histogram: Option<Histogram>,
2702    alerts_metrics: Arc<AlertsMetrics>,
2703}
2704
2705impl ExternalOpMetrics {
2706    async fn run_op<R, F, OpFn, ErrFn>(
2707        &self,
2708        op_fn: OpFn,
2709        on_err_fn: ErrFn,
2710    ) -> Result<R, ExternalError>
2711    where
2712        F: std::future::Future<Output = Result<R, ExternalError>>,
2713        OpFn: FnOnce() -> F,
2714        ErrFn: FnOnce(&AlertsMetrics, &ExternalError),
2715    {
2716        self.started.inc();
2717        let start = Instant::now();
2718        let res = op_fn().await;
2719        let elapsed_seconds = start.elapsed().as_secs_f64();
2720        self.seconds.inc_by(elapsed_seconds);
2721        if let Some(h) = &self.seconds_histogram {
2722            h.observe(elapsed_seconds);
2723        }
2724        match res.as_ref() {
2725            Ok(_) => self.succeeded.inc(),
2726            Err(err) => {
2727                self.failed.inc();
2728                on_err_fn(&self.alerts_metrics, err);
2729            }
2730        };
2731        res
2732    }
2733
2734    fn run_stream<'a, R: 'a, S, OpFn, ErrFn>(
2735        &'a self,
2736        op_fn: OpFn,
2737        mut on_err_fn: ErrFn,
2738    ) -> impl futures::Stream<Item = Result<R, ExternalError>> + 'a
2739    where
2740        S: futures::Stream<Item = Result<R, ExternalError>> + Unpin + 'a,
2741        OpFn: FnOnce() -> S,
2742        ErrFn: FnMut(&AlertsMetrics, &ExternalError) + 'a,
2743    {
2744        self.started.inc();
2745        let start = Instant::now();
2746        let mut stream = op_fn();
2747        stream! {
2748            let mut succeeded = true;
2749            while let Some(res) = stream.next().await {
2750                if let Err(err) = res.as_ref() {
2751                    on_err_fn(&self.alerts_metrics, err);
2752                    succeeded = false;
2753                }
2754                yield res;
2755            }
2756            if succeeded {
2757                self.succeeded.inc()
2758            } else {
2759                self.failed.inc()
2760            }
2761            let elapsed_seconds = start.elapsed().as_secs_f64();
2762            self.seconds.inc_by(elapsed_seconds);
2763            if let Some(h) = &self.seconds_histogram {
2764                h.observe(elapsed_seconds);
2765            }
2766        }
2767    }
2768}
2769
2770#[derive(Debug)]
2771pub struct BlobMetrics {
2772    set: ExternalOpMetrics,
2773    get: ExternalOpMetrics,
2774    list_keys: ExternalOpMetrics,
2775    delete: ExternalOpMetrics,
2776    restore: ExternalOpMetrics,
2777    delete_noop: IntCounter,
2778    blob_sizes: Histogram,
2779    pub rtt_latency: Gauge,
2780}
2781
2782#[derive(Debug)]
2783pub struct MetricsBlob {
2784    blob: Arc<dyn Blob>,
2785    metrics: Arc<Metrics>,
2786}
2787
2788impl MetricsBlob {
2789    pub fn new(blob: Arc<dyn Blob>, metrics: Arc<Metrics>) -> Self {
2790        MetricsBlob { blob, metrics }
2791    }
2792
2793    fn on_err(alerts_metrics: &AlertsMetrics, _err: &ExternalError) {
2794        alerts_metrics.blob_failures.inc()
2795    }
2796}
2797
2798#[async_trait]
2799impl Blob for MetricsBlob {
2800    #[instrument(name = "blob::get", fields(shard=blob_key_shard_id(key)))]
2801    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
2802        let res = self
2803            .metrics
2804            .blob
2805            .get
2806            .run_op(|| self.blob.get(key), Self::on_err)
2807            .await;
2808        if let Ok(Some(value)) = res.as_ref() {
2809            self.metrics
2810                .blob
2811                .get
2812                .bytes
2813                .inc_by(u64::cast_from(value.len()));
2814        }
2815        res
2816    }
2817
2818    #[instrument(name = "blob::list_keys_and_metadata", fields(shard=blob_key_shard_id(key_prefix)))]
2819    async fn list_keys_and_metadata(
2820        &self,
2821        key_prefix: &str,
2822        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
2823    ) -> Result<(), ExternalError> {
2824        let mut byte_total = 0;
2825        let mut instrumented = |blob_metadata: BlobMetadata| {
2826            // Track the size of the _keys_, not the blobs, so that we get a
2827            // sense for how much network bandwidth these calls are using.
2828            byte_total += blob_metadata.key.len();
2829            f(blob_metadata)
2830        };
2831
2832        let res = self
2833            .metrics
2834            .blob
2835            .list_keys
2836            .run_op(
2837                || {
2838                    self.blob
2839                        .list_keys_and_metadata(key_prefix, &mut instrumented)
2840                },
2841                Self::on_err,
2842            )
2843            .await;
2844
2845        self.metrics
2846            .blob
2847            .list_keys
2848            .bytes
2849            .inc_by(u64::cast_from(byte_total));
2850
2851        res
2852    }
2853
2854    #[instrument(name = "blob::set", fields(shard=blob_key_shard_id(key),size_bytes=value.len()))]
2855    async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
2856        let bytes = value.len();
2857        let res = self
2858            .metrics
2859            .blob
2860            .set
2861            .run_op(|| self.blob.set(key, value), Self::on_err)
2862            .await;
2863        if res.is_ok() {
2864            self.metrics.blob.set.bytes.inc_by(u64::cast_from(bytes));
2865            self.metrics.blob.blob_sizes.observe(f64::cast_lossy(bytes));
2866        }
2867        res
2868    }
2869
2870    #[instrument(name = "blob::delete", fields(shard=blob_key_shard_id(key)))]
2871    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
2872        let bytes = self
2873            .metrics
2874            .blob
2875            .delete
2876            .run_op(|| self.blob.delete(key), Self::on_err)
2877            .await?;
2878        if let Some(bytes) = bytes {
2879            self.metrics.blob.delete.bytes.inc_by(u64::cast_from(bytes));
2880        } else {
2881            self.metrics.blob.delete_noop.inc();
2882        }
2883        Ok(bytes)
2884    }
2885
2886    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
2887        self.metrics
2888            .blob
2889            .restore
2890            .run_op(|| self.blob.restore(key), Self::on_err)
2891            .await
2892    }
2893}
2894
2895#[derive(Debug)]
2896pub struct ConsensusMetrics {
2897    list_keys: ExternalOpMetrics,
2898    head: ExternalOpMetrics,
2899    compare_and_set: ExternalOpMetrics,
2900    scan: ExternalOpMetrics,
2901    truncate: ExternalOpMetrics,
2902    truncated_count: IntCounter,
2903    pub rtt_latency: Gauge,
2904}
2905
2906#[derive(Debug)]
2907pub struct MetricsConsensus {
2908    consensus: Arc<dyn Consensus>,
2909    metrics: Arc<Metrics>,
2910}
2911
2912impl MetricsConsensus {
2913    pub fn new(consensus: Arc<dyn Consensus>, metrics: Arc<Metrics>) -> Self {
2914        MetricsConsensus { consensus, metrics }
2915    }
2916
2917    fn on_err(alerts_metrics: &AlertsMetrics, err: &ExternalError) {
2918        // As of 2022-09-06, regular determinate errors are expected in
2919        // Consensus (i.e. "txn conflict, please retry"), so only count the
2920        // indeterminate ones.
2921        if let ExternalError::Indeterminate(_) = err {
2922            alerts_metrics.consensus_failures.inc()
2923        }
2924    }
2925}
2926
2927#[async_trait]
2928impl Consensus for MetricsConsensus {
2929    fn list_keys(&self) -> ResultStream<'_, String> {
2930        Box::pin(
2931            self.metrics
2932                .consensus
2933                .list_keys
2934                .run_stream(|| self.consensus.list_keys(), Self::on_err),
2935        )
2936    }
2937
2938    #[instrument(name = "consensus::head", fields(shard=key))]
2939    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
2940        let res = self
2941            .metrics
2942            .consensus
2943            .head
2944            .run_op(|| self.consensus.head(key), Self::on_err)
2945            .await;
2946        if let Ok(Some(data)) = res.as_ref() {
2947            self.metrics
2948                .consensus
2949                .head
2950                .bytes
2951                .inc_by(u64::cast_from(data.data.len()));
2952        }
2953        res
2954    }
2955
2956    #[instrument(name = "consensus::compare_and_set", fields(shard=key,size_bytes=new.data.len()))]
2957    async fn compare_and_set(
2958        &self,
2959        key: &str,
2960        expected: Option<SeqNo>,
2961        new: VersionedData,
2962    ) -> Result<CaSResult, ExternalError> {
2963        let bytes = new.data.len();
2964        let res = self
2965            .metrics
2966            .consensus
2967            .compare_and_set
2968            .run_op(
2969                || self.consensus.compare_and_set(key, expected, new),
2970                Self::on_err,
2971            )
2972            .await;
2973        match res.as_ref() {
2974            Ok(CaSResult::Committed) => self
2975                .metrics
2976                .consensus
2977                .compare_and_set
2978                .bytes
2979                .inc_by(u64::cast_from(bytes)),
2980            Ok(CaSResult::ExpectationMismatch) | Err(_) => {}
2981        }
2982        res
2983    }
2984
2985    #[instrument(name = "consensus::scan", fields(shard=key))]
2986    async fn scan(
2987        &self,
2988        key: &str,
2989        from: SeqNo,
2990        limit: usize,
2991    ) -> Result<Vec<VersionedData>, ExternalError> {
2992        let res = self
2993            .metrics
2994            .consensus
2995            .scan
2996            .run_op(|| self.consensus.scan(key, from, limit), Self::on_err)
2997            .await;
2998        if let Ok(dataz) = res.as_ref() {
2999            let bytes: usize = dataz.iter().map(|x| x.data.len()).sum();
3000            self.metrics
3001                .consensus
3002                .scan
3003                .bytes
3004                .inc_by(u64::cast_from(bytes));
3005        }
3006        res
3007    }
3008
3009    #[instrument(name = "consensus::truncate", fields(shard=key))]
3010    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
3011        let deleted = self
3012            .metrics
3013            .consensus
3014            .truncate
3015            .run_op(|| self.consensus.truncate(key, seqno), Self::on_err)
3016            .await?;
3017        self.metrics
3018            .consensus
3019            .truncated_count
3020            .inc_by(u64::cast_from(deleted));
3021        Ok(deleted)
3022    }
3023}
3024
3025/// A standard set of metrics for an async task. Call [TaskMetrics::instrument_task] to instrument
3026/// a future and report its metrics for this task type.
3027#[derive(Debug, Clone)]
3028pub struct TaskMetrics {
3029    f64_gauges: Vec<(Gauge, fn(&tokio_metrics::TaskMetrics) -> f64)>,
3030    u64_gauges: Vec<(
3031        GenericGauge<AtomicU64>,
3032        fn(&tokio_metrics::TaskMetrics) -> u64,
3033    )>,
3034    monitor: TaskMonitor,
3035}
3036
3037impl TaskMetrics {
3038    pub fn new(name: &str) -> Self {
3039        let monitor = TaskMonitor::new();
3040        Self {
3041            f64_gauges: vec![
3042                (
3043                    Gauge::make_collector(metric!(
3044                        name: "mz_persist_task_total_idle_duration",
3045                        help: "Seconds of time spent idling, ie. waiting for a task to be woken up.",
3046                        const_labels: {"name" => name}
3047                    )),
3048                    |m| m.total_idle_duration.as_secs_f64(),
3049                ),
3050                (
3051                    Gauge::make_collector(metric!(
3052                        name: "mz_persist_task_total_scheduled_duration",
3053                        help: "Seconds of time spent scheduled, ie. ready to poll but not yet polled.",
3054                        const_labels: {"name" => name}
3055                    )),
3056                    |m| m.total_scheduled_duration.as_secs_f64(),
3057                ),
3058            ],
3059            u64_gauges: vec![
3060                (
3061                    MakeCollector::make_collector(metric!(
3062                        name: "mz_persist_task_total_scheduled_count",
3063                        help: "The total number of task schedules. Useful for computing the average scheduled time.",
3064                        const_labels: {"name" => name}
3065                    )),
3066                    |m| m.total_scheduled_count,
3067                ),
3068                (
3069                    MakeCollector::make_collector(metric!(
3070                        name: "mz_persist_task_total_idled_count",
3071                        help: "The total number of task idles. Useful for computing the average idle time.",
3072                        const_labels: {"name" => name}
3073                    ,
3074                    )),
3075                    |m| m.total_idled_count,
3076                ),
3077            ],
3078            monitor,
3079        }
3080    }
3081
3082    /// Instrument the provided future. The expectation is that the result will be executed
3083    /// as a task. (See [TaskMonitor::instrument] for more context.)
3084    pub fn instrument_task<F>(&self, task: F) -> tokio_metrics::Instrumented<F> {
3085        TaskMonitor::instrument(&self.monitor, task)
3086    }
3087}
3088
3089impl Collector for TaskMetrics {
3090    fn desc(&self) -> Vec<&Desc> {
3091        let mut descs = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
3092        for (g, _) in &self.f64_gauges {
3093            descs.extend(g.desc());
3094        }
3095        for (g, _) in &self.u64_gauges {
3096            descs.extend(g.desc());
3097        }
3098        descs
3099    }
3100
3101    fn collect(&self) -> Vec<MetricFamily> {
3102        let mut families = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
3103        let metrics = self.monitor.cumulative();
3104        for (g, metrics_fn) in &self.f64_gauges {
3105            g.set(metrics_fn(&metrics));
3106            families.extend(g.collect());
3107        }
3108        for (g, metrics_fn) in &self.u64_gauges {
3109            g.set(metrics_fn(&metrics));
3110            families.extend(g.collect());
3111        }
3112        families
3113    }
3114}
3115
3116#[derive(Debug)]
3117pub struct TasksMetrics {
3118    pub heartbeat_read: TaskMetrics,
3119}
3120
3121impl TasksMetrics {
3122    fn new(registry: &MetricsRegistry) -> Self {
3123        let heartbeat_read = TaskMetrics::new("heartbeat_read");
3124        registry.register_collector(heartbeat_read.clone());
3125        TasksMetrics { heartbeat_read }
3126    }
3127}
3128
3129#[derive(Debug)]
3130pub struct SchemaMetrics {
3131    pub(crate) cache_fetch_state_count: IntCounter,
3132    pub(crate) cache_schema: SchemaCacheMetrics,
3133    pub(crate) cache_migration: SchemaCacheMetrics,
3134    pub(crate) migration_count_same: IntCounter,
3135    pub(crate) migration_count_codec: IntCounter,
3136    pub(crate) migration_count_either: IntCounter,
3137    pub(crate) migration_len_legacy_codec: IntCounter,
3138    pub(crate) migration_len_either_codec: IntCounter,
3139    pub(crate) migration_len_either_arrow: IntCounter,
3140    pub(crate) migration_new_count: IntCounter,
3141    pub(crate) migration_new_seconds: Counter,
3142    pub(crate) migration_migrate_seconds: Counter,
3143}
3144
3145impl SchemaMetrics {
3146    fn new(registry: &MetricsRegistry) -> Self {
3147        let cached: IntCounterVec = registry.register(metric!(
3148            name: "mz_persist_schema_cache_cached_count",
3149            help: "count of schema cache entries served from cache",
3150            var_labels: ["op"],
3151        ));
3152        let computed: IntCounterVec = registry.register(metric!(
3153            name: "mz_persist_schema_cache_computed_count",
3154            help: "count of schema cache entries computed",
3155            var_labels: ["op"],
3156        ));
3157        let unavailable: IntCounterVec = registry.register(metric!(
3158            name: "mz_persist_schema_cache_unavailable_count",
3159            help: "count of schema cache entries unavailable at current state",
3160            var_labels: ["op"],
3161        ));
3162        let added: IntCounterVec = registry.register(metric!(
3163            name: "mz_persist_schema_cache_added_count",
3164            help: "count of schema cache entries added",
3165            var_labels: ["op"],
3166        ));
3167        let dropped: IntCounterVec = registry.register(metric!(
3168            name: "mz_persist_schema_cache_dropped_count",
3169            help: "count of schema cache entries dropped",
3170            var_labels: ["op"],
3171        ));
3172        let cache = |name| SchemaCacheMetrics {
3173            cached_count: cached.with_label_values(&[name]),
3174            computed_count: computed.with_label_values(&[name]),
3175            unavailable_count: unavailable.with_label_values(&[name]),
3176            added_count: added.with_label_values(&[name]),
3177            dropped_count: dropped.with_label_values(&[name]),
3178        };
3179        let migration_count: IntCounterVec = registry.register(metric!(
3180            name: "mz_persist_schema_migration_count",
3181            help: "count of fetch part migrations",
3182            var_labels: ["op"],
3183        ));
3184        let migration_len: IntCounterVec = registry.register(metric!(
3185            name: "mz_persist_schema_migration_len",
3186            help: "count of migrated update records",
3187            var_labels: ["op"],
3188        ));
3189        SchemaMetrics {
3190            cache_fetch_state_count: registry.register(metric!(
3191                name: "mz_persist_schema_cache_fetch_state_count",
3192                help: "count of state fetches by the schema cache",
3193            )),
3194            cache_schema: cache("schema"),
3195            cache_migration: cache("migration"),
3196            migration_count_same: migration_count.with_label_values(&["same"]),
3197            migration_count_codec: migration_count.with_label_values(&["codec"]),
3198            migration_count_either: migration_count.with_label_values(&["either"]),
3199            migration_len_legacy_codec: migration_len.with_label_values(&["legacy_codec"]),
3200            migration_len_either_codec: migration_len.with_label_values(&["either_codec"]),
3201            migration_len_either_arrow: migration_len.with_label_values(&["either_arrow"]),
3202            migration_new_count: registry.register(metric!(
3203                name: "mz_persist_schema_migration_new_count",
3204                help: "count of migrations constructed",
3205            )),
3206            migration_new_seconds: registry.register(metric!(
3207                name: "mz_persist_schema_migration_new_seconds",
3208                help: "seconds spent constructing migration logic",
3209            )),
3210            migration_migrate_seconds: registry.register(metric!(
3211                name: "mz_persist_schema_migration_migrate_seconds",
3212                help: "seconds spent applying migration logic",
3213            )),
3214        }
3215    }
3216}
3217
3218#[derive(Debug, Clone)]
3219pub struct SchemaCacheMetrics {
3220    pub(crate) cached_count: IntCounter,
3221    pub(crate) computed_count: IntCounter,
3222    pub(crate) unavailable_count: IntCounter,
3223    pub(crate) added_count: IntCounter,
3224    pub(crate) dropped_count: IntCounter,
3225}
3226
3227#[derive(Debug)]
3228pub struct InlineMetrics {
3229    pub(crate) part_commit_count: IntCounter,
3230    pub(crate) part_commit_bytes: IntCounter,
3231    pub(crate) backpressure: BatchWriteMetrics,
3232}
3233
3234impl InlineMetrics {
3235    fn new(registry: &MetricsRegistry) -> Self {
3236        InlineMetrics {
3237            part_commit_count: registry.register(metric!(
3238                name: "mz_persist_inline_part_commit_count",
3239                help: "count of inline parts committed to state",
3240            )),
3241            part_commit_bytes: registry.register(metric!(
3242                name: "mz_persist_inline_part_commit_bytes",
3243                help: "total size of of inline parts committed to state",
3244            )),
3245            backpressure: BatchWriteMetrics::new(registry, "inline_backpressure"),
3246        }
3247    }
3248}
3249
3250fn blob_key_shard_id(key: &str) -> Option<String> {
3251    let (shard_id, _) = BlobKey::parse_ids(key).ok()?;
3252    Some(shard_id.to_string())
3253}
3254
3255/// Encode a frontier into an i64 acceptable for use in metrics.
3256pub fn encode_ts_metric<T: Codec64>(ts: &Antichain<T>) -> i64 {
3257    // We have two problems in mapping a persist frontier into a metric.
3258    // First is that we only have a `T: Timestamp+Codec64`. Second, is
3259    // mapping an antichain to a single counter value. We solve both by
3260    // taking advantage of the fact that in practice, timestamps in mz are
3261    // currently always a u64 (and if we switch them, it will be to an i64).
3262    // This means that for all values that mz would actually produce,
3263    // interpreting the encoded bytes as a little-endian i64 will work.
3264    // Both of them impl PartialOrder, so in practice, there will always be
3265    // zero or one elements in the antichain.
3266    match ts.elements().first() {
3267        Some(ts) => i64::from_le_bytes(Codec64::encode(ts)),
3268        None => i64::MAX,
3269    }
3270}