1use async_stream::stream;
13use mz_persist_types::stats::PartStatsMetrics;
14use std::collections::BTreeMap;
15use std::sync::{Arc, Mutex, Weak};
16use std::time::{Duration, Instant};
17use tokio::sync::{OnceCell, OwnedSemaphorePermit, Semaphore};
18
19use async_trait::async_trait;
20use bytes::Bytes;
21use futures_util::StreamExt;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::{CastFrom, CastLossy};
24use mz_ore::instrument;
25use mz_ore::metric;
26use mz_ore::metrics::{
27 ComputedGauge, ComputedIntGauge, ComputedUIntGauge, Counter, DeleteOnDropCounter,
28 DeleteOnDropGauge, IntCounter, MakeCollector, MetricVecExt, MetricsRegistry, UIntGauge,
29 UIntGaugeVec, raw,
30};
31use mz_ore::stats::histogram_seconds_buckets;
32use mz_persist::location::{
33 Blob, BlobMetadata, CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData,
34};
35use mz_persist::metrics::{ColumnarMetrics, S3BlobMetrics};
36use mz_persist::retry::RetryStream;
37use mz_persist_types::Codec64;
38use mz_postgres_client::metrics::PostgresClientMetrics;
39use prometheus::core::{AtomicI64, AtomicU64, Collector, Desc, GenericGauge};
40use prometheus::proto::MetricFamily;
41use prometheus::{CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounterVec};
42use timely::progress::Antichain;
43use tokio_metrics::TaskMonitor;
44use tracing::{Instrument, debug, info, info_span};
45
46use crate::fetch::{FETCH_SEMAPHORE_COST_ADJUSTMENT, FETCH_SEMAPHORE_PERMIT_ADJUSTMENT};
47use crate::internal::paths::BlobKey;
48use crate::{PersistConfig, ShardId};
49
50pub struct Metrics {
55 _vecs: MetricsVecs,
56 _uptime: ComputedGauge,
57
58 pub blob: BlobMetrics,
60 pub consensus: ConsensusMetrics,
62 pub cmds: CmdsMetrics,
64 pub retries: RetriesMetrics,
66 pub user: BatchWriteMetrics,
69 pub read: BatchPartReadMetrics,
71 pub compaction: CompactionMetrics,
73 pub gc: GcMetrics,
75 pub lease: LeaseMetrics,
77 pub codecs: CodecsMetrics,
79 pub state: StateMetrics,
81 pub shards: ShardsMetrics,
83 pub audit: UsageAuditMetrics,
85 pub locks: LocksMetrics,
87 pub watch: WatchMetrics,
89 pub pubsub_client: PubSubClientMetrics,
91 pub pushdown: PushdownMetrics,
93 pub consolidation: ConsolidationMetrics,
95 pub blob_cache_mem: BlobMemCache,
97 pub tasks: TasksMetrics,
99 pub columnar: ColumnarMetrics,
101 pub schema: SchemaMetrics,
103 pub inline: InlineMetrics,
105 pub(crate) semaphore: SemaphoreMetrics,
107
108 pub sink: SinkMetrics,
110
111 pub s3_blob: S3BlobMetrics,
113 pub postgres_consensus: PostgresClientMetrics,
115
116 #[allow(dead_code)]
117 pub(crate) registry: MetricsRegistry,
118}
119
120impl std::fmt::Debug for Metrics {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 f.debug_struct("Metrics").finish_non_exhaustive()
123 }
124}
125
126impl Metrics {
127 pub fn new(cfg: &PersistConfig, registry: &MetricsRegistry) -> Self {
129 let vecs = MetricsVecs::new(registry);
130 let start = Instant::now();
131 let uptime = registry.register_computed_gauge(
132 metric!(
133 name: "mz_persist_metadata_seconds",
134 help: "server uptime, labels are build metadata",
135 const_labels: {
136 "version" => cfg.build_version,
137 "build_type" => if cfg!(release) { "release" } else { "debug" }
138 },
139 ),
140 move || start.elapsed().as_secs_f64(),
141 );
142 let s3_blob = S3BlobMetrics::new(registry);
143 let columnar = ColumnarMetrics::new(
144 registry,
145 &s3_blob.lgbytes,
146 Arc::clone(&cfg.configs),
147 cfg.is_cc_active,
148 );
149 Metrics {
150 blob: vecs.blob_metrics(),
151 consensus: vecs.consensus_metrics(),
152 cmds: vecs.cmds_metrics(registry),
153 retries: vecs.retries_metrics(),
154 codecs: vecs.codecs_metrics(),
155 user: BatchWriteMetrics::new(registry, "user"),
156 read: vecs.batch_part_read_metrics(),
157 compaction: CompactionMetrics::new(registry),
158 gc: GcMetrics::new(registry),
159 lease: LeaseMetrics::new(registry),
160 state: StateMetrics::new(registry),
161 shards: ShardsMetrics::new(registry),
162 audit: UsageAuditMetrics::new(registry),
163 locks: vecs.locks_metrics(),
164 watch: WatchMetrics::new(registry),
165 pubsub_client: PubSubClientMetrics::new(registry),
166 pushdown: PushdownMetrics::new(registry),
167 consolidation: ConsolidationMetrics::new(registry),
168 blob_cache_mem: BlobMemCache::new(registry),
169 tasks: TasksMetrics::new(registry),
170 columnar,
171 schema: SchemaMetrics::new(registry),
172 inline: InlineMetrics::new(registry),
173 semaphore: SemaphoreMetrics::new(cfg.clone(), registry.clone()),
174 sink: SinkMetrics::new(registry),
175 s3_blob,
176 postgres_consensus: PostgresClientMetrics::new(registry, "mz_persist"),
177 _vecs: vecs,
178 _uptime: uptime,
179 registry: registry.clone(),
180 }
181 }
182
183 pub fn write_amplification(&self) -> f64 {
188 let total_written = self.blob.set.bytes.get();
191 let user_written = self.user.goodbytes.get();
192 #[allow(clippy::as_conversions)]
193 {
194 total_written as f64 / user_written as f64
195 }
196 }
197}
198
199#[derive(Debug)]
200struct MetricsVecs {
201 cmd_started: IntCounterVec,
202 cmd_cas_mismatch: IntCounterVec,
203 cmd_succeeded: IntCounterVec,
204 cmd_failed: IntCounterVec,
205 cmd_seconds: CounterVec,
206
207 external_op_started: IntCounterVec,
208 external_op_succeeded: IntCounterVec,
209 external_op_failed: IntCounterVec,
210 external_op_bytes: IntCounterVec,
211 external_op_seconds: CounterVec,
212 external_consensus_truncated_count: IntCounter,
213 external_blob_delete_noop_count: IntCounter,
214 external_blob_sizes: Histogram,
215 external_rtt_latency: GaugeVec,
216 external_op_latency: HistogramVec,
217
218 retry_started: IntCounterVec,
219 retry_finished: IntCounterVec,
220 retry_retries: IntCounterVec,
221 retry_sleep_seconds: CounterVec,
222
223 encode_count: IntCounterVec,
224 encode_seconds: CounterVec,
225 decode_count: IntCounterVec,
226 decode_seconds: CounterVec,
227
228 read_part_bytes: IntCounterVec,
229 read_part_goodbytes: IntCounterVec,
230 read_part_count: IntCounterVec,
231 read_part_seconds: CounterVec,
232 read_ts_rewrite: IntCounterVec,
233
234 lock_acquire_count: IntCounterVec,
235 lock_blocking_acquire_count: IntCounterVec,
236 lock_blocking_seconds: CounterVec,
237
238 alerts_metrics: Arc<AlertsMetrics>,
240}
241
242impl MetricsVecs {
243 fn new(registry: &MetricsRegistry) -> Self {
244 MetricsVecs {
245 cmd_started: registry.register(metric!(
246 name: "mz_persist_cmd_started_count",
247 help: "count of commands started",
248 var_labels: ["cmd"],
249 )),
250 cmd_cas_mismatch: registry.register(metric!(
251 name: "mz_persist_cmd_cas_mismatch_count",
252 help: "count of command retries from CaS mismatch",
253 var_labels: ["cmd"],
254 )),
255 cmd_succeeded: registry.register(metric!(
256 name: "mz_persist_cmd_succeeded_count",
257 help: "count of commands succeeded",
258 var_labels: ["cmd"],
259 )),
260 cmd_failed: registry.register(metric!(
261 name: "mz_persist_cmd_failed_count",
262 help: "count of commands failed",
263 var_labels: ["cmd"],
264 )),
265 cmd_seconds: registry.register(metric!(
266 name: "mz_persist_cmd_seconds",
267 help: "time spent applying commands",
268 var_labels: ["cmd"],
269 )),
270
271 external_op_started: registry.register(metric!(
272 name: "mz_persist_external_started_count",
273 help: "count of external service calls started",
274 var_labels: ["op"],
275 )),
276 external_op_succeeded: registry.register(metric!(
277 name: "mz_persist_external_succeeded_count",
278 help: "count of external service calls succeeded",
279 var_labels: ["op"],
280 )),
281 external_op_failed: registry.register(metric!(
282 name: "mz_persist_external_failed_count",
283 help: "count of external service calls failed",
284 var_labels: ["op"],
285 )),
286 external_op_bytes: registry.register(metric!(
287 name: "mz_persist_external_bytes_count",
288 help: "total size represented by external service calls",
289 var_labels: ["op"],
290 )),
291 external_op_seconds: registry.register(metric!(
292 name: "mz_persist_external_seconds",
293 help: "time spent in external service calls",
294 var_labels: ["op"],
295 )),
296 external_consensus_truncated_count: registry.register(metric!(
297 name: "mz_persist_external_consensus_truncated_count",
298 help: "count of versions deleted by consensus truncate calls",
299 )),
300 external_blob_delete_noop_count: registry.register(metric!(
301 name: "mz_persist_external_blob_delete_noop_count",
302 help: "count of blob delete calls that deleted a non-existent key",
303 )),
304 external_blob_sizes: registry.register(metric!(
305 name: "mz_persist_external_blob_sizes",
306 help: "histogram of blob sizes at put time",
307 buckets: mz_ore::stats::HISTOGRAM_BYTE_BUCKETS.to_vec(),
308 )),
309 external_rtt_latency: registry.register(metric!(
310 name: "mz_persist_external_rtt_latency",
311 help: "roundtrip-time to external service as seen by this process",
312 var_labels: ["external"],
313 )),
314 external_op_latency: registry.register(metric!(
315 name: "mz_persist_external_op_latency",
316 help: "rountrip latency observed by individual performance-critical operations",
317 var_labels: ["op"],
318 buckets: histogram_seconds_buckets(0.000_500, 32.0),
321 )),
322
323 retry_started: registry.register(metric!(
324 name: "mz_persist_retry_started_count",
325 help: "count of retry loops started",
326 var_labels: ["op"],
327 )),
328 retry_finished: registry.register(metric!(
329 name: "mz_persist_retry_finished_count",
330 help: "count of retry loops finished",
331 var_labels: ["op"],
332 )),
333 retry_retries: registry.register(metric!(
334 name: "mz_persist_retry_retries_count",
335 help: "count of total attempts by retry loops",
336 var_labels: ["op"],
337 )),
338 retry_sleep_seconds: registry.register(metric!(
339 name: "mz_persist_retry_sleep_seconds",
340 help: "time spent in retry loop backoff",
341 var_labels: ["op"],
342 )),
343
344 encode_count: registry.register(metric!(
345 name: "mz_persist_encode_count",
346 help: "count of op encodes",
347 var_labels: ["op"],
348 )),
349 encode_seconds: registry.register(metric!(
350 name: "mz_persist_encode_seconds",
351 help: "time spent in op encodes",
352 var_labels: ["op"],
353 )),
354 decode_count: registry.register(metric!(
355 name: "mz_persist_decode_count",
356 help: "count of op decodes",
357 var_labels: ["op"],
358 )),
359 decode_seconds: registry.register(metric!(
360 name: "mz_persist_decode_seconds",
361 help: "time spent in op decodes",
362 var_labels: ["op"],
363 )),
364
365 read_part_bytes: registry.register(metric!(
366 name: "mz_persist_read_batch_part_bytes",
367 help: "total encoded size of batch parts read",
368 var_labels: ["op"],
369 )),
370 read_part_goodbytes: registry.register(metric!(
371 name: "mz_persist_read_batch_part_goodbytes",
372 help: "total logical size of batch parts read",
373 var_labels: ["op"],
374 )),
375 read_part_count: registry.register(metric!(
376 name: "mz_persist_read_batch_part_count",
377 help: "count of batch parts read",
378 var_labels: ["op"],
379 )),
380 read_part_seconds: registry.register(metric!(
381 name: "mz_persist_read_batch_part_seconds",
382 help: "time spent reading batch parts",
383 var_labels: ["op"],
384 )),
385 read_ts_rewrite: registry.register(metric!(
386 name: "mz_persist_read_ts_rewite",
387 help: "count of updates read with rewritten ts",
388 var_labels: ["op"],
389 )),
390
391 lock_acquire_count: registry.register(metric!(
392 name: "mz_persist_lock_acquire_count",
393 help: "count of locks acquired",
394 var_labels: ["op"],
395 )),
396 lock_blocking_acquire_count: registry.register(metric!(
397 name: "mz_persist_lock_blocking_acquire_count",
398 help: "count of locks acquired that required blocking",
399 var_labels: ["op"],
400 )),
401 lock_blocking_seconds: registry.register(metric!(
402 name: "mz_persist_lock_blocking_seconds",
403 help: "time spent blocked for a lock",
404 var_labels: ["op"],
405 )),
406
407 alerts_metrics: Arc::new(AlertsMetrics::new(registry)),
408 }
409 }
410
411 fn cmds_metrics(&self, registry: &MetricsRegistry) -> CmdsMetrics {
412 CmdsMetrics {
413 init_state: self.cmd_metrics("init_state"),
414 add_rollup: self.cmd_metrics("add_rollup"),
415 remove_rollups: self.cmd_metrics("remove_rollups"),
416 register: self.cmd_metrics("register"),
417 compare_and_append: self.cmd_metrics("compare_and_append"),
418 compare_and_append_noop: registry.register(metric!(
419 name: "mz_persist_cmd_compare_and_append_noop",
420 help: "count of compare_and_append retries that were discoverd to have already committed",
421 )),
422 compare_and_downgrade_since: self.cmd_metrics("compare_and_downgrade_since"),
423 downgrade_since: self.cmd_metrics("downgrade_since"),
424 expire_reader: self.cmd_metrics("expire_reader"),
425 expire_writer: self.cmd_metrics("expire_writer"),
426 merge_res: self.cmd_metrics("merge_res"),
427 become_tombstone: self.cmd_metrics("become_tombstone"),
428 compare_and_evolve_schema: self.cmd_metrics("compare_and_evolve_schema"),
429 spine_exert: self.cmd_metrics("spine_exert"),
430 fetch_upper_count: registry.register(metric!(
431 name: "mz_persist_cmd_fetch_upper_count",
432 help: "count of fetch_upper calls",
433 ))
434 }
435 }
436
437 fn cmd_metrics(&self, cmd: &str) -> CmdMetrics {
438 CmdMetrics {
439 name: cmd.to_owned(),
440 started: self.cmd_started.with_label_values(&[cmd]),
441 succeeded: self.cmd_succeeded.with_label_values(&[cmd]),
442 cas_mismatch: self.cmd_cas_mismatch.with_label_values(&[cmd]),
443 failed: self.cmd_failed.with_label_values(&[cmd]),
444 seconds: self.cmd_seconds.with_label_values(&[cmd]),
445 }
446 }
447
448 fn retries_metrics(&self) -> RetriesMetrics {
449 RetriesMetrics {
450 determinate: RetryDeterminate {
451 apply_unbatched_cmd_cas: self.retry_metrics("apply_unbatched_cmd::cas"),
452 },
453 external: RetryExternal {
454 batch_delete: Arc::new(self.retry_metrics("batch::delete")),
455 batch_set: self.retry_metrics("batch::set"),
456 blob_open: self.retry_metrics("blob::open"),
457 compaction_noop_delete: Arc::new(self.retry_metrics("compaction_noop::delete")),
458 consensus_open: self.retry_metrics("consensus::open"),
459 fetch_batch_get: self.retry_metrics("fetch_batch::get"),
460 fetch_state_scan: self.retry_metrics("fetch_state::scan"),
461 gc_truncate: self.retry_metrics("gc::truncate"),
462 maybe_init_cas: self.retry_metrics("maybe_init::cas"),
463 rollup_delete: self.retry_metrics("rollup::delete"),
464 rollup_get: self.retry_metrics("rollup::get"),
465 rollup_set: self.retry_metrics("rollup::set"),
466 hollow_run_get: self.retry_metrics("hollow_run::get"),
467 hollow_run_set: self.retry_metrics("hollow_run::set"),
468 storage_usage_shard_size: self.retry_metrics("storage_usage::shard_size"),
469 },
470 compare_and_append_idempotent: self.retry_metrics("compare_and_append_idempotent"),
471 fetch_latest_state: self.retry_metrics("fetch_latest_state"),
472 fetch_live_states: self.retry_metrics("fetch_live_states"),
473 idempotent_cmd: self.retry_metrics("idempotent_cmd"),
474 next_listen_batch: self.retry_metrics("next_listen_batch"),
475 snapshot: self.retry_metrics("snapshot"),
476 }
477 }
478
479 fn retry_metrics(&self, name: &str) -> RetryMetrics {
480 RetryMetrics {
481 name: name.to_owned(),
482 started: self.retry_started.with_label_values(&[name]),
483 finished: self.retry_finished.with_label_values(&[name]),
484 retries: self.retry_retries.with_label_values(&[name]),
485 sleep_seconds: self.retry_sleep_seconds.with_label_values(&[name]),
486 }
487 }
488
489 fn codecs_metrics(&self) -> CodecsMetrics {
490 CodecsMetrics {
491 state: self.codec_metrics("state"),
492 state_diff: self.codec_metrics("state_diff"),
493 batch: self.codec_metrics("batch"),
494 key: self.codec_metrics("key"),
495 val: self.codec_metrics("val"),
496 }
497 }
498
499 fn codec_metrics(&self, op: &str) -> CodecMetrics {
500 CodecMetrics {
501 encode_count: self.encode_count.with_label_values(&[op]),
502 encode_seconds: self.encode_seconds.with_label_values(&[op]),
503 decode_count: self.decode_count.with_label_values(&[op]),
504 decode_seconds: self.decode_seconds.with_label_values(&[op]),
505 }
506 }
507
508 fn blob_metrics(&self) -> BlobMetrics {
509 BlobMetrics {
510 set: self.external_op_metrics("blob_set", true),
511 get: self.external_op_metrics("blob_get", true),
512 list_keys: self.external_op_metrics("blob_list_keys", false),
513 delete: self.external_op_metrics("blob_delete", false),
514 restore: self.external_op_metrics("restore", false),
515 delete_noop: self.external_blob_delete_noop_count.clone(),
516 blob_sizes: self.external_blob_sizes.clone(),
517 rtt_latency: self.external_rtt_latency.with_label_values(&["blob"]),
518 }
519 }
520
521 fn consensus_metrics(&self) -> ConsensusMetrics {
522 ConsensusMetrics {
523 list_keys: self.external_op_metrics("consensus_list_keys", false),
524 head: self.external_op_metrics("consensus_head", false),
525 compare_and_set: self.external_op_metrics("consensus_cas", true),
526 scan: self.external_op_metrics("consensus_scan", false),
527 truncate: self.external_op_metrics("consensus_truncate", false),
528 truncated_count: self.external_consensus_truncated_count.clone(),
529 rtt_latency: self.external_rtt_latency.with_label_values(&["consensus"]),
530 }
531 }
532
533 fn external_op_metrics(&self, op: &str, latency_histogram: bool) -> ExternalOpMetrics {
534 ExternalOpMetrics {
535 started: self.external_op_started.with_label_values(&[op]),
536 succeeded: self.external_op_succeeded.with_label_values(&[op]),
537 failed: self.external_op_failed.with_label_values(&[op]),
538 bytes: self.external_op_bytes.with_label_values(&[op]),
539 seconds: self.external_op_seconds.with_label_values(&[op]),
540 seconds_histogram: if latency_histogram {
541 Some(self.external_op_latency.with_label_values(&[op]))
542 } else {
543 None
544 },
545 alerts_metrics: Arc::clone(&self.alerts_metrics),
546 }
547 }
548
549 fn batch_part_read_metrics(&self) -> BatchPartReadMetrics {
550 BatchPartReadMetrics {
551 listen: self.read_metrics("listen"),
552 snapshot: self.read_metrics("snapshot"),
553 batch_fetcher: self.read_metrics("batch_fetcher"),
554 compaction: self.read_metrics("compaction"),
555 unindexed: self.read_metrics("unindexed"),
556 }
557 }
558
559 fn read_metrics(&self, op: &str) -> ReadMetrics {
560 ReadMetrics {
561 part_bytes: self.read_part_bytes.with_label_values(&[op]),
562 part_goodbytes: self.read_part_goodbytes.with_label_values(&[op]),
563 part_count: self.read_part_count.with_label_values(&[op]),
564 seconds: self.read_part_seconds.with_label_values(&[op]),
565 ts_rewrite: self.read_ts_rewrite.with_label_values(&[op]),
566 }
567 }
568
569 fn locks_metrics(&self) -> LocksMetrics {
570 LocksMetrics {
571 applier_read_cacheable: self.lock_metrics("applier_read_cacheable"),
572 applier_read_noncacheable: self.lock_metrics("applier_read_noncacheable"),
573 applier_write: self.lock_metrics("applier_write"),
574 watch: self.lock_metrics("watch"),
575 }
576 }
577
578 fn lock_metrics(&self, op: &str) -> LockMetrics {
579 LockMetrics {
580 acquire_count: self.lock_acquire_count.with_label_values(&[op]),
581 blocking_acquire_count: self.lock_blocking_acquire_count.with_label_values(&[op]),
582 blocking_seconds: self.lock_blocking_seconds.with_label_values(&[op]),
583 }
584 }
585}
586
587#[derive(Debug)]
588pub struct CmdMetrics {
589 pub(crate) name: String,
590 pub(crate) started: IntCounter,
591 pub(crate) cas_mismatch: IntCounter,
592 pub(crate) succeeded: IntCounter,
593 pub(crate) failed: IntCounter,
594 pub(crate) seconds: Counter,
595}
596
597impl CmdMetrics {
598 pub async fn run_cmd<R, E, F, CmdFn>(
599 &self,
600 shard_metrics: &ShardMetrics,
601 cmd_fn: CmdFn,
602 ) -> Result<R, E>
603 where
604 F: std::future::Future<Output = Result<R, E>>,
605 CmdFn: FnOnce() -> F,
606 {
607 self.started.inc();
608 let start = Instant::now();
609 let res = cmd_fn().await;
610 self.seconds.inc_by(start.elapsed().as_secs_f64());
611 match res.as_ref() {
612 Ok(_) => {
613 self.succeeded.inc();
614 shard_metrics.cmd_succeeded.inc();
615 }
616 Err(_) => self.failed.inc(),
617 };
618 res
619 }
620}
621
622#[derive(Debug)]
623pub struct CmdsMetrics {
624 pub(crate) init_state: CmdMetrics,
625 pub(crate) add_rollup: CmdMetrics,
626 pub(crate) remove_rollups: CmdMetrics,
627 pub(crate) register: CmdMetrics,
628 pub(crate) compare_and_append: CmdMetrics,
629 pub(crate) compare_and_append_noop: IntCounter,
630 pub(crate) compare_and_downgrade_since: CmdMetrics,
631 pub(crate) downgrade_since: CmdMetrics,
632 pub(crate) expire_reader: CmdMetrics,
633 pub(crate) expire_writer: CmdMetrics,
634 pub(crate) merge_res: CmdMetrics,
635 pub(crate) become_tombstone: CmdMetrics,
636 pub(crate) compare_and_evolve_schema: CmdMetrics,
637 pub(crate) spine_exert: CmdMetrics,
638 pub(crate) fetch_upper_count: IntCounter,
639}
640
641#[derive(Debug)]
642pub struct RetryMetrics {
643 pub(crate) name: String,
644 pub(crate) started: IntCounter,
645 pub(crate) finished: IntCounter,
646 pub(crate) retries: IntCounter,
647 pub(crate) sleep_seconds: Counter,
648}
649
650impl RetryMetrics {
651 pub(crate) fn stream(&self, retry: RetryStream) -> MetricsRetryStream {
652 MetricsRetryStream::new(retry, self)
653 }
654}
655
656#[derive(Debug)]
657pub struct RetryDeterminate {
658 pub(crate) apply_unbatched_cmd_cas: RetryMetrics,
659}
660
661#[derive(Debug)]
662pub struct RetryExternal {
663 pub(crate) batch_delete: Arc<RetryMetrics>,
664 pub(crate) batch_set: RetryMetrics,
665 pub(crate) blob_open: RetryMetrics,
666 pub(crate) compaction_noop_delete: Arc<RetryMetrics>,
667 pub(crate) consensus_open: RetryMetrics,
668 pub(crate) fetch_batch_get: RetryMetrics,
669 pub(crate) fetch_state_scan: RetryMetrics,
670 pub(crate) gc_truncate: RetryMetrics,
671 pub(crate) maybe_init_cas: RetryMetrics,
672 pub(crate) rollup_delete: RetryMetrics,
673 pub(crate) rollup_get: RetryMetrics,
674 pub(crate) rollup_set: RetryMetrics,
675 pub(crate) hollow_run_get: RetryMetrics,
676 pub(crate) hollow_run_set: RetryMetrics,
677 pub(crate) storage_usage_shard_size: RetryMetrics,
678}
679
680#[derive(Debug)]
681pub struct RetriesMetrics {
682 pub(crate) determinate: RetryDeterminate,
683 pub(crate) external: RetryExternal,
684
685 pub(crate) compare_and_append_idempotent: RetryMetrics,
686 pub(crate) fetch_latest_state: RetryMetrics,
687 pub(crate) fetch_live_states: RetryMetrics,
688 pub(crate) idempotent_cmd: RetryMetrics,
689 pub(crate) next_listen_batch: RetryMetrics,
690 pub(crate) snapshot: RetryMetrics,
691}
692
693#[derive(Debug)]
694pub struct BatchPartReadMetrics {
695 pub(crate) listen: ReadMetrics,
696 pub(crate) snapshot: ReadMetrics,
697 pub(crate) batch_fetcher: ReadMetrics,
698 pub(crate) compaction: ReadMetrics,
699 pub(crate) unindexed: ReadMetrics,
700}
701
702#[derive(Debug, Clone)]
703pub struct ReadMetrics {
704 pub(crate) part_bytes: IntCounter,
705 pub(crate) part_goodbytes: IntCounter,
706 pub(crate) part_count: IntCounter,
707 pub(crate) seconds: Counter,
708 pub(crate) ts_rewrite: IntCounter,
709}
710
711#[derive(Debug, Clone)]
714pub struct BatchWriteMetrics {
715 pub(crate) bytes: IntCounter,
716 pub(crate) goodbytes: IntCounter,
717 pub(crate) seconds: Counter,
718 pub(crate) write_stalls: IntCounter,
719 pub(crate) key_lower_too_big: IntCounter,
720
721 pub(crate) unordered: IntCounter,
722 pub(crate) codec_order: IntCounter,
723 pub(crate) structured_order: IntCounter,
724 _order_counts: IntCounterVec,
725
726 pub(crate) step_stats: Counter,
727 pub(crate) step_part_writing: Counter,
728 pub(crate) step_inline: Counter,
729}
730
731impl BatchWriteMetrics {
732 fn new(registry: &MetricsRegistry, name: &str) -> Self {
733 let order_counts: IntCounterVec = registry.register(metric!(
734 name: format!("mz_persist_{}_write_batch_order", name),
735 help: "count of batches by the data ordering",
736 var_labels: ["order"],
737 ));
738 let unordered = order_counts.with_label_values(&["unordered"]);
739 let codec_order = order_counts.with_label_values(&["codec"]);
740 let structured_order = order_counts.with_label_values(&["structured"]);
741
742 BatchWriteMetrics {
743 bytes: registry.register(metric!(
744 name: format!("mz_persist_{}_bytes", name),
745 help: format!("total encoded size of {} batches written", name),
746 )),
747 goodbytes: registry.register(metric!(
748 name: format!("mz_persist_{}_goodbytes", name),
749 help: format!("total logical size of {} batches written", name),
750 )),
751 seconds: registry.register(metric!(
752 name: format!("mz_persist_{}_write_batch_part_seconds", name),
753 help: format!("time spent writing {} batches", name),
754 )),
755 write_stalls: registry.register(metric!(
756 name: format!("mz_persist_{}_write_stall_count", name),
757 help: format!(
758 "count of {} writes stalling to await max outstanding reqs",
759 name
760 ),
761 )),
762 key_lower_too_big: registry.register(metric!(
763 name: format!("mz_persist_{}_key_lower_too_big", name),
764 help: format!(
765 "count of {} writes that were unable to write a key lower, because the size threshold was too low",
766 name
767 ),
768 )),
769 unordered,
770 codec_order,
771 structured_order,
772 _order_counts: order_counts,
773 step_stats: registry.register(metric!(
774 name: format!("mz_persist_{}_step_stats", name),
775 help: format!("time spent computing {} update stats", name),
776 )),
777 step_part_writing: registry.register(metric!(
778 name: format!("mz_persist_{}_step_part_writing", name),
779 help: format!("blocking time spent writing parts for {} updates", name),
780 )),
781 step_inline: registry.register(metric!(
782 name: format!("mz_persist_{}_step_inline", name),
783 help: format!("time spent encoding {} inline batches", name)
784 )),
785 }
786 }
787}
788
789#[derive(Debug)]
790pub struct CompactionMetrics {
791 pub(crate) requested: IntCounter,
792 pub(crate) dropped: IntCounter,
793 pub(crate) disabled: IntCounter,
794 pub(crate) skipped: IntCounter,
795 pub(crate) started: IntCounter,
796 pub(crate) applied: IntCounter,
797 pub(crate) timed_out: IntCounter,
798 pub(crate) failed: IntCounter,
799 pub(crate) noop: IntCounter,
800 pub(crate) seconds: Counter,
801 pub(crate) concurrency_waits: IntCounter,
802 pub(crate) queued_seconds: Counter,
803 pub(crate) memory_violations: IntCounter,
804 pub(crate) runs_compacted: IntCounter,
805 pub(crate) chunks_compacted: IntCounter,
806 pub(crate) not_all_prefetched: IntCounter,
807 pub(crate) parts_prefetched: IntCounter,
808 pub(crate) parts_waited: IntCounter,
809 pub(crate) fast_path_eligible: IntCounter,
810 pub(crate) admin_count: IntCounter,
811
812 pub(crate) applied_exact_match: IntCounter,
813 pub(crate) applied_subset_match: IntCounter,
814 pub(crate) not_applied_too_many_updates: IntCounter,
815
816 pub(crate) batch: BatchWriteMetrics,
817 pub(crate) steps: CompactionStepTimings,
818 pub(crate) schema_selection: CompactionSchemaSelection,
819
820 pub(crate) _steps_vec: CounterVec,
821}
822
823impl CompactionMetrics {
824 fn new(registry: &MetricsRegistry) -> Self {
825 let step_timings: CounterVec = registry.register(metric!(
826 name: "mz_persist_compaction_step_seconds",
827 help: "time spent on individual steps of compaction",
828 var_labels: ["step"],
829 ));
830 let schema_selection: CounterVec = registry.register(metric!(
831 name: "mz_persist_compaction_schema_selection",
832 help: "count of compactions and how we did schema selection",
833 var_labels: ["selection"],
834 ));
835
836 CompactionMetrics {
837 requested: registry.register(metric!(
838 name: "mz_persist_compaction_requested",
839 help: "count of total compaction requests",
840 )),
841 dropped: registry.register(metric!(
842 name: "mz_persist_compaction_dropped",
843 help: "count of total compaction requests dropped due to a full queue",
844 )),
845 disabled: registry.register(metric!(
846 name: "mz_persist_compaction_disabled",
847 help: "count of total compaction requests dropped because compaction was disabled",
848 )),
849 skipped: registry.register(metric!(
850 name: "mz_persist_compaction_skipped",
851 help: "count of compactions skipped due to heuristics",
852 )),
853 started: registry.register(metric!(
854 name: "mz_persist_compaction_started",
855 help: "count of compactions started",
856 )),
857 failed: registry.register(metric!(
858 name: "mz_persist_compaction_failed",
859 help: "count of compactions failed",
860 )),
861 applied: registry.register(metric!(
862 name: "mz_persist_compaction_applied",
863 help: "count of compactions applied to state",
864 )),
865 timed_out: registry.register(metric!(
866 name: "mz_persist_compaction_timed_out",
867 help: "count of compactions that timed out",
868 )),
869 noop: registry.register(metric!(
870 name: "mz_persist_compaction_noop",
871 help: "count of compactions discarded (obsolete)",
872 )),
873 seconds: registry.register(metric!(
874 name: "mz_persist_compaction_seconds",
875 help: "time spent in compaction",
876 )),
877 concurrency_waits: registry.register(metric!(
878 name: "mz_persist_compaction_concurrency_waits",
879 help: "count of compaction requests that ever blocked due to concurrency limit",
880 )),
881 queued_seconds: registry.register(metric!(
882 name: "mz_persist_compaction_queued_seconds",
883 help: "time that compaction requests spent queued",
884 )),
885 memory_violations: registry.register(metric!(
886 name: "mz_persist_compaction_memory_violations",
887 help: "count of compaction memory requirement violations",
888 )),
889 runs_compacted: registry.register(metric!(
890 name: "mz_persist_compaction_runs_compacted",
891 help: "count of runs compacted",
892 )),
893 chunks_compacted: registry.register(metric!(
894 name: "mz_persist_compaction_chunks_compacted",
895 help: "count of run chunks compacted",
896 )),
897 not_all_prefetched: registry.register(metric!(
898 name: "mz_persist_compaction_not_all_prefetched",
899 help: "count of compactions where not all inputs were prefetched",
900 )),
901 parts_prefetched: registry.register(metric!(
902 name: "mz_persist_compaction_parts_prefetched",
903 help: "count of compaction parts completely prefetched by the time they're needed",
904 )),
905 parts_waited: registry.register(metric!(
906 name: "mz_persist_compaction_parts_waited",
907 help: "count of compaction parts that had to be waited on",
908 )),
909 fast_path_eligible: registry.register(metric!(
910 name: "mz_persist_compaction_fast_path_eligible",
911 help: "count of compaction requests that could have used the fast-path optimization",
912 )),
913 admin_count: registry.register(metric!(
914 name: "mz_persist_compaction_admin_count",
915 help: "count of compaction requests that were performed by admin tooling",
916 )),
917 applied_exact_match: registry.register(metric!(
918 name: "mz_persist_compaction_applied_exact_match",
919 help: "count of merge results that exactly replaced a SpineBatch",
920 )),
921 applied_subset_match: registry.register(metric!(
922 name: "mz_persist_compaction_applied_subset_match",
923 help: "count of merge results that replaced a subset of a SpineBatch",
924 )),
925 not_applied_too_many_updates: registry.register(metric!(
926 name: "mz_persist_compaction_not_applied_too_many_updates",
927 help: "count of merge results that did not apply due to too many updates",
928 )),
929 batch: BatchWriteMetrics::new(registry, "compaction"),
930 steps: CompactionStepTimings::new(step_timings.clone()),
931 schema_selection: CompactionSchemaSelection::new(schema_selection.clone()),
932 _steps_vec: step_timings,
933 }
934 }
935}
936
937#[derive(Debug)]
938pub struct CompactionStepTimings {
939 pub(crate) part_fetch_seconds: Counter,
940 pub(crate) heap_population_seconds: Counter,
941}
942
943impl CompactionStepTimings {
944 fn new(step_timings: CounterVec) -> CompactionStepTimings {
945 CompactionStepTimings {
946 part_fetch_seconds: step_timings.with_label_values(&["part_fetch"]),
947 heap_population_seconds: step_timings.with_label_values(&["heap_population"]),
948 }
949 }
950}
951
952#[derive(Debug)]
953pub struct CompactionSchemaSelection {
954 pub(crate) recent_schema: Counter,
955 pub(crate) no_schema: Counter,
956}
957
958impl CompactionSchemaSelection {
959 fn new(schema_selection: CounterVec) -> CompactionSchemaSelection {
960 CompactionSchemaSelection {
961 recent_schema: schema_selection.with_label_values(&["recent"]),
962 no_schema: schema_selection.with_label_values(&["none"]),
963 }
964 }
965}
966
967#[derive(Debug)]
968pub struct GcMetrics {
969 pub(crate) noop: IntCounter,
970 pub(crate) started: IntCounter,
971 pub(crate) finished: IntCounter,
972 pub(crate) merged: IntCounter,
973 pub(crate) seconds: Counter,
974 pub(crate) steps: GcStepTimings,
975}
976
977#[derive(Debug)]
978pub struct GcStepTimings {
979 pub(crate) find_removable_rollups: Counter,
980 pub(crate) fetch_seconds: Counter,
981 pub(crate) find_deletable_blobs_seconds: Counter,
982 pub(crate) delete_rollup_seconds: Counter,
983 pub(crate) delete_batch_part_seconds: Counter,
984 pub(crate) truncate_diff_seconds: Counter,
985 pub(crate) remove_rollups_from_state: Counter,
986 pub(crate) post_gc_calculations_seconds: Counter,
987}
988
989impl GcStepTimings {
990 fn new(step_timings: CounterVec) -> Self {
991 Self {
992 find_removable_rollups: step_timings.with_label_values(&["find_removable_rollups"]),
993 fetch_seconds: step_timings.with_label_values(&["fetch"]),
994 find_deletable_blobs_seconds: step_timings.with_label_values(&["find_deletable_blobs"]),
995 delete_rollup_seconds: step_timings.with_label_values(&["delete_rollup"]),
996 delete_batch_part_seconds: step_timings.with_label_values(&["delete_batch_part"]),
997 truncate_diff_seconds: step_timings.with_label_values(&["truncate_diff"]),
998 remove_rollups_from_state: step_timings
999 .with_label_values(&["remove_rollups_from_state"]),
1000 post_gc_calculations_seconds: step_timings.with_label_values(&["post_gc_calculations"]),
1001 }
1002 }
1003}
1004
1005impl GcMetrics {
1006 fn new(registry: &MetricsRegistry) -> Self {
1007 let step_timings: CounterVec = registry.register(metric!(
1008 name: "mz_persist_gc_step_seconds",
1009 help: "time spent on individual steps of gc",
1010 var_labels: ["step"],
1011 ));
1012 GcMetrics {
1013 noop: registry.register(metric!(
1014 name: "mz_persist_gc_noop",
1015 help: "count of garbage collections skipped because they were already done",
1016 )),
1017 started: registry.register(metric!(
1018 name: "mz_persist_gc_started",
1019 help: "count of garbage collections started",
1020 )),
1021 finished: registry.register(metric!(
1022 name: "mz_persist_gc_finished",
1023 help: "count of garbage collections finished",
1024 )),
1025 merged: registry.register(metric!(
1026 name: "mz_persist_gc_merged_reqs",
1027 help: "count of garbage collection requests merged",
1028 )),
1029 seconds: registry.register(metric!(
1030 name: "mz_persist_gc_seconds",
1031 help: "time spent in garbage collections",
1032 )),
1033 steps: GcStepTimings::new(step_timings),
1034 }
1035 }
1036}
1037
1038#[derive(Debug)]
1039pub struct LeaseMetrics {
1040 pub(crate) timeout_read: IntCounter,
1041 pub(crate) dropped_part: IntCounter,
1042}
1043
1044impl LeaseMetrics {
1045 fn new(registry: &MetricsRegistry) -> Self {
1046 LeaseMetrics {
1047 timeout_read: registry.register(metric!(
1048 name: "mz_persist_lease_timeout_read",
1049 help: "count of readers whose lease timed out",
1050 )),
1051 dropped_part: registry.register(metric!(
1052 name: "mz_persist_lease_dropped_part",
1053 help: "count of LeasedBatchParts that were dropped without being politely returned",
1054 )),
1055 }
1056 }
1057}
1058
1059struct IncOnDrop(IntCounter);
1060
1061impl Drop for IncOnDrop {
1062 fn drop(&mut self) {
1063 self.0.inc()
1064 }
1065}
1066
1067pub struct MetricsRetryStream {
1068 retry: RetryStream,
1069 pub(crate) retries: IntCounter,
1070 sleep_seconds: Counter,
1071 _finished: IncOnDrop,
1072}
1073
1074impl MetricsRetryStream {
1075 pub fn new(retry: RetryStream, metrics: &RetryMetrics) -> Self {
1076 metrics.started.inc();
1077 MetricsRetryStream {
1078 retry,
1079 retries: metrics.retries.clone(),
1080 sleep_seconds: metrics.sleep_seconds.clone(),
1081 _finished: IncOnDrop(metrics.finished.clone()),
1082 }
1083 }
1084
1085 pub fn attempt(&self) -> usize {
1087 self.retry.attempt()
1088 }
1089
1090 pub fn next_sleep(&self) -> Duration {
1092 self.retry.next_sleep()
1093 }
1094
1095 pub async fn sleep(self) -> Self {
1100 self.retries.inc();
1101 self.sleep_seconds
1102 .inc_by(self.retry.next_sleep().as_secs_f64());
1103 let retry = self.retry.sleep().await;
1104 MetricsRetryStream {
1105 retry,
1106 retries: self.retries,
1107 sleep_seconds: self.sleep_seconds,
1108 _finished: self._finished,
1109 }
1110 }
1111}
1112
1113#[derive(Debug)]
1114pub struct CodecsMetrics {
1115 pub(crate) state: CodecMetrics,
1116 pub(crate) state_diff: CodecMetrics,
1117 pub(crate) batch: CodecMetrics,
1118 pub(crate) key: CodecMetrics,
1119 pub(crate) val: CodecMetrics,
1120 }
1123
1124#[derive(Debug)]
1125pub struct CodecMetrics {
1126 pub(crate) encode_count: IntCounter,
1127 pub(crate) encode_seconds: Counter,
1128 pub(crate) decode_count: IntCounter,
1129 pub(crate) decode_seconds: Counter,
1130}
1131
1132impl CodecMetrics {
1133 pub(crate) fn encode<R, F: FnOnce() -> R>(&self, f: F) -> R {
1134 let now = Instant::now();
1135 let r = f();
1136 self.encode_count.inc();
1137 self.encode_seconds.inc_by(now.elapsed().as_secs_f64());
1138 r
1139 }
1140
1141 pub(crate) fn decode<R, F: FnOnce() -> R>(&self, f: F) -> R {
1142 let now = Instant::now();
1143 let r = f();
1144 self.decode_count.inc();
1145 self.decode_seconds.inc_by(now.elapsed().as_secs_f64());
1146 r
1147 }
1148}
1149
1150#[derive(Debug)]
1151pub struct StateMetrics {
1152 pub(crate) apply_spine_fast_path: IntCounter,
1153 pub(crate) apply_spine_slow_path: IntCounter,
1154 pub(crate) apply_spine_slow_path_lenient: IntCounter,
1155 pub(crate) apply_spine_slow_path_lenient_adjustment: IntCounter,
1156 pub(crate) apply_spine_slow_path_with_reconstruction: IntCounter,
1157 pub(crate) apply_spine_flattened: IntCounter,
1158 pub(crate) update_state_noop_path: IntCounter,
1159 pub(crate) update_state_empty_path: IntCounter,
1160 pub(crate) update_state_fast_path: IntCounter,
1161 pub(crate) update_state_slow_path: IntCounter,
1162 pub(crate) rollup_at_seqno_migration: IntCounter,
1163 pub(crate) fetch_recent_live_diffs_fast_path: IntCounter,
1164 pub(crate) fetch_recent_live_diffs_slow_path: IntCounter,
1165 pub(crate) writer_added: IntCounter,
1166 pub(crate) writer_removed: IntCounter,
1167 pub(crate) force_apply_hostname: IntCounter,
1168 pub(crate) rollup_write_success: IntCounter,
1169 pub(crate) rollup_write_noop_latest: IntCounter,
1170 pub(crate) rollup_write_noop_truncated: IntCounter,
1171}
1172
1173impl StateMetrics {
1174 pub(crate) fn new(registry: &MetricsRegistry) -> Self {
1175 let rollup_write_noop: IntCounterVec = registry.register(metric!(
1176 name: "mz_persist_state_rollup_write_noop",
1177 help: "count of no-op rollup writes",
1178 var_labels: ["reason"],
1179 ));
1180
1181 StateMetrics {
1182 apply_spine_fast_path: registry.register(metric!(
1183 name: "mz_persist_state_apply_spine_fast_path",
1184 help: "count of spine diff applications that hit the fast path",
1185 )),
1186 apply_spine_slow_path: registry.register(metric!(
1187 name: "mz_persist_state_apply_spine_slow_path",
1188 help: "count of spine diff applications that hit the slow path",
1189 )),
1190 apply_spine_slow_path_lenient: registry.register(metric!(
1191 name: "mz_persist_state_apply_spine_slow_path_lenient",
1192 help: "count of spine diff applications that hit the lenient compaction apply path",
1193 )),
1194 apply_spine_slow_path_lenient_adjustment: registry.register(metric!(
1195 name: "mz_persist_state_apply_spine_slow_path_lenient_adjustment",
1196 help: "count of adjustments made by the lenient compaction apply path",
1197 )),
1198 apply_spine_slow_path_with_reconstruction: registry.register(metric!(
1199 name: "mz_persist_state_apply_spine_slow_path_with_reconstruction",
1200 help: "count of spine diff applications that hit the slow path with extra spine reconstruction step",
1201 )),
1202 apply_spine_flattened: registry.register(metric!(
1203 name: "mz_persist_state_apply_spine_flattened",
1204 help: "count of spine diff applications that flatten the trace",
1205 )),
1206 update_state_noop_path: registry.register(metric!(
1207 name: "mz_persist_state_update_state_noop_path",
1208 help: "count of state update applications that no-oped due to shared state",
1209 )),
1210 update_state_empty_path: registry.register(metric!(
1211 name: "mz_persist_state_update_state_empty_path",
1212 help: "count of state update applications that found no new updates",
1213 )),
1214 update_state_fast_path: registry.register(metric!(
1215 name: "mz_persist_state_update_state_fast_path",
1216 help: "count of state update applications that hit the fast path",
1217 )),
1218 update_state_slow_path: registry.register(metric!(
1219 name: "mz_persist_state_update_state_slow_path",
1220 help: "count of state update applications that hit the slow path",
1221 )),
1222 rollup_at_seqno_migration: registry.register(metric!(
1223 name: "mz_persist_state_rollup_at_seqno_migration",
1224 help: "count of fetch_rollup_at_seqno calls that only worked because of the migration",
1225 )),
1226 fetch_recent_live_diffs_fast_path: registry.register(metric!(
1227 name: "mz_persist_state_fetch_recent_live_diffs_fast_path",
1228 help: "count of fetch_recent_live_diffs that hit the fast path",
1229 )),
1230 fetch_recent_live_diffs_slow_path: registry.register(metric!(
1231 name: "mz_persist_state_fetch_recent_live_diffs_slow_path",
1232 help: "count of fetch_recent_live_diffs that hit the slow path",
1233 )),
1234 writer_added: registry.register(metric!(
1235 name: "mz_persist_state_writer_added",
1236 help: "count of writers added to the state",
1237 )),
1238 writer_removed: registry.register(metric!(
1239 name: "mz_persist_state_writer_removed",
1240 help: "count of writers removed from the state",
1241 )),
1242 force_apply_hostname: registry.register(metric!(
1243 name: "mz_persist_state_force_applied_hostname",
1244 help: "count of when hostname diffs needed to be force applied",
1245 )),
1246 rollup_write_success: registry.register(metric!(
1247 name: "mz_persist_state_rollup_write_success",
1248 help: "count of rollups written successful (may not be linked in to state)",
1249 )),
1250 rollup_write_noop_latest: rollup_write_noop.with_label_values(&["latest"]),
1251 rollup_write_noop_truncated: rollup_write_noop.with_label_values(&["truncated"]),
1252 }
1253 }
1254}
1255
1256#[derive(Debug)]
1257pub struct ShardsMetrics {
1258 _count: ComputedIntGauge,
1262 since: mz_ore::metrics::IntGaugeVec,
1263 upper: mz_ore::metrics::IntGaugeVec,
1264 encoded_rollup_size: mz_ore::metrics::UIntGaugeVec,
1265 encoded_diff_size: mz_ore::metrics::IntCounterVec,
1266 hollow_batch_count: mz_ore::metrics::UIntGaugeVec,
1267 spine_batch_count: mz_ore::metrics::UIntGaugeVec,
1268 batch_part_count: mz_ore::metrics::UIntGaugeVec,
1269 batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
1270 batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
1271 update_count: mz_ore::metrics::UIntGaugeVec,
1272 rollup_count: mz_ore::metrics::UIntGaugeVec,
1273 largest_batch_size: mz_ore::metrics::UIntGaugeVec,
1274 seqnos_held: mz_ore::metrics::UIntGaugeVec,
1275 seqnos_since_last_rollup: mz_ore::metrics::UIntGaugeVec,
1276 gc_seqno_held_parts: mz_ore::metrics::UIntGaugeVec,
1277 gc_live_diffs: mz_ore::metrics::UIntGaugeVec,
1278 gc_finished: mz_ore::metrics::IntCounterVec,
1279 compaction_applied: mz_ore::metrics::IntCounterVec,
1280 cmd_succeeded: mz_ore::metrics::IntCounterVec,
1281 usage_current_state_batches_bytes: mz_ore::metrics::UIntGaugeVec,
1282 usage_current_state_rollups_bytes: mz_ore::metrics::UIntGaugeVec,
1283 usage_referenced_not_current_state_bytes: mz_ore::metrics::UIntGaugeVec,
1284 usage_not_leaked_not_referenced_bytes: mz_ore::metrics::UIntGaugeVec,
1285 usage_leaked_bytes: mz_ore::metrics::UIntGaugeVec,
1286 pubsub_push_diff_applied: mz_ore::metrics::IntCounterVec,
1287 pubsub_push_diff_not_applied_stale: mz_ore::metrics::IntCounterVec,
1288 pubsub_push_diff_not_applied_out_of_order: mz_ore::metrics::IntCounterVec,
1289 stale_version: mz_ore::metrics::UIntGaugeVec,
1290 blob_gets: mz_ore::metrics::IntCounterVec,
1291 blob_sets: mz_ore::metrics::IntCounterVec,
1292 live_writers: mz_ore::metrics::UIntGaugeVec,
1293 unconsolidated_snapshot: mz_ore::metrics::IntCounterVec,
1294 backpressure_emitted_bytes: IntCounterVec,
1295 backpressure_last_backpressured_bytes: UIntGaugeVec,
1296 backpressure_retired_bytes: IntCounterVec,
1297 rewrite_part_count: UIntGaugeVec,
1298 inline_part_count: UIntGaugeVec,
1299 inline_part_bytes: UIntGaugeVec,
1300 compact_batches: UIntGaugeVec,
1301 compacting_batches: UIntGaugeVec,
1302 noncompact_batches: UIntGaugeVec,
1303 schema_registry_version_count: UIntGaugeVec,
1304 inline_backpressure_count: IntCounterVec,
1305 shards: Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
1309}
1310
1311impl ShardsMetrics {
1312 fn new(registry: &MetricsRegistry) -> Self {
1313 let shards = Arc::new(Mutex::new(BTreeMap::new()));
1314 let shards_count = Arc::clone(&shards);
1315 ShardsMetrics {
1316 _count: registry.register_computed_gauge(
1317 metric!(
1318 name: "mz_persist_shard_count",
1319 help: "count of all active shards on this process",
1320 ),
1321 move || {
1322 let mut ret = 0;
1323 Self::compute(&shards_count, |_m| ret += 1);
1324 ret
1325 },
1326 ),
1327 since: registry.register(metric!(
1328 name: "mz_persist_shard_since",
1329 help: "since by shard",
1330 var_labels: ["shard", "name"],
1331 )),
1332 upper: registry.register(metric!(
1333 name: "mz_persist_shard_upper",
1334 help: "upper by shard",
1335 var_labels: ["shard", "name"],
1336 )),
1337 encoded_rollup_size: registry.register(metric!(
1338 name: "mz_persist_shard_rollup_size_bytes",
1339 help: "total encoded rollup size by shard",
1340 var_labels: ["shard", "name"],
1341 )),
1342 encoded_diff_size: registry.register(metric!(
1343 name: "mz_persist_shard_diff_size_bytes",
1344 help: "total encoded diff size by shard",
1345 var_labels: ["shard", "name"],
1346 )),
1347 hollow_batch_count: registry.register(metric!(
1348 name: "mz_persist_shard_hollow_batch_count",
1349 help: "count of hollow batches by shard",
1350 var_labels: ["shard", "name"],
1351 )),
1352 spine_batch_count: registry.register(metric!(
1353 name: "mz_persist_shard_spine_batch_count",
1354 help: "count of spine batches by shard",
1355 var_labels: ["shard", "name"],
1356 )),
1357 batch_part_count: registry.register(metric!(
1358 name: "mz_persist_shard_batch_part_count",
1359 help: "count of batch parts by shard",
1360 var_labels: ["shard", "name"],
1361 )),
1362 batch_part_version_count: registry.register(metric!(
1363 name: "mz_persist_shard_batch_part_version_count",
1364 help: "count of batch parts by shard and version",
1365 var_labels: ["shard", "name", "version"],
1366 )),
1367 batch_part_version_bytes: registry.register(metric!(
1368 name: "mz_persist_shard_batch_part_version_bytes",
1369 help: "total bytes in batch parts by shard and version",
1370 var_labels: ["shard", "name", "version"],
1371 )),
1372 update_count: registry.register(metric!(
1373 name: "mz_persist_shard_update_count",
1374 help: "count of updates by shard",
1375 var_labels: ["shard", "name"],
1376 )),
1377 rollup_count: registry.register(metric!(
1378 name: "mz_persist_shard_rollup_count",
1379 help: "count of rollups by shard",
1380 var_labels: ["shard", "name"],
1381 )),
1382 largest_batch_size: registry.register(metric!(
1383 name: "mz_persist_shard_largest_batch_size",
1384 help: "largest encoded batch size by shard",
1385 var_labels: ["shard", "name"],
1386 )),
1387 seqnos_held: registry.register(metric!(
1388 name: "mz_persist_shard_seqnos_held",
1389 help: "maximum count of gc-ineligible states by shard",
1390 var_labels: ["shard", "name"],
1391 )),
1392 seqnos_since_last_rollup: registry.register(metric!(
1393 name: "mz_persist_shard_seqnos_since_last_rollup",
1394 help: "count of seqnos since last rollup",
1395 var_labels: ["shard", "name"],
1396 )),
1397 gc_seqno_held_parts: registry.register(metric!(
1398 name: "mz_persist_shard_gc_seqno_held_parts",
1399 help: "count of parts referenced by some live state but not the current state (ie. parts kept only to satisfy seqno holds) at GC time",
1400 var_labels: ["shard", "name"],
1401 )),
1402 gc_live_diffs: registry.register(metric!(
1403 name: "mz_persist_shard_gc_live_diffs",
1404 help: "the number of diffs (or, alternatively, the number of seqnos) present in consensus state at GC time",
1405 var_labels: ["shard", "name"],
1406 )),
1407 gc_finished: registry.register(metric!(
1408 name: "mz_persist_shard_gc_finished",
1409 help: "count of garbage collections finished by shard",
1410 var_labels: ["shard", "name"],
1411 )),
1412 compaction_applied: registry.register(metric!(
1413 name: "mz_persist_shard_compaction_applied",
1414 help: "count of compactions applied to state by shard",
1415 var_labels: ["shard", "name"],
1416 )),
1417 cmd_succeeded: registry.register(metric!(
1418 name: "mz_persist_shard_cmd_succeeded",
1419 help: "count of commands succeeded by shard",
1420 var_labels: ["shard", "name"],
1421 )),
1422 usage_current_state_batches_bytes: registry.register(metric!(
1423 name: "mz_persist_shard_usage_current_state_batches_bytes",
1424 help: "data in batches/parts referenced by current version of state",
1425 var_labels: ["shard", "name"],
1426 )),
1427 usage_current_state_rollups_bytes: registry.register(metric!(
1428 name: "mz_persist_shard_usage_current_state_rollups_bytes",
1429 help: "data in rollups referenced by current version of state",
1430 var_labels: ["shard", "name"],
1431 )),
1432 usage_referenced_not_current_state_bytes: registry.register(metric!(
1433 name: "mz_persist_shard_usage_referenced_not_current_state_bytes",
1434 help: "data referenced only by a previous version of state",
1435 var_labels: ["shard", "name"],
1436 )),
1437 usage_not_leaked_not_referenced_bytes: registry.register(metric!(
1438 name: "mz_persist_shard_usage_not_leaked_not_referenced_bytes",
1439 help: "data written by an active writer but not referenced by any version of state",
1440 var_labels: ["shard", "name"],
1441 )),
1442 usage_leaked_bytes: registry.register(metric!(
1443 name: "mz_persist_shard_usage_leaked_bytes",
1444 help: "data reclaimable by a leaked blob detector",
1445 var_labels: ["shard", "name"],
1446 )),
1447 pubsub_push_diff_applied: registry.register(metric!(
1448 name: "mz_persist_shard_pubsub_diff_applied",
1449 help: "number of diffs received via pubsub that applied",
1450 var_labels: ["shard", "name"],
1451 )),
1452 pubsub_push_diff_not_applied_stale: registry.register(metric!(
1453 name: "mz_persist_shard_pubsub_diff_not_applied_stale",
1454 help: "number of diffs received via pubsub that did not apply due to staleness",
1455 var_labels: ["shard", "name"],
1456 )),
1457 pubsub_push_diff_not_applied_out_of_order: registry.register(metric!(
1458 name: "mz_persist_shard_pubsub_diff_not_applied_out_of_order",
1459 help: "number of diffs received via pubsub that did not apply due to out-of-order delivery",
1460 var_labels: ["shard", "name"],
1461 )),
1462 stale_version: registry.register(metric!(
1463 name: "mz_persist_shard_stale_version",
1464 help: "indicates whether the current version of the shard is less than the current version of the code",
1465 var_labels: ["shard", "name"],
1466 )),
1467 blob_gets: registry.register(metric!(
1468 name: "mz_persist_shard_blob_gets",
1469 help: "number of Blob::get calls for this shard",
1470 var_labels: ["shard", "name"],
1471 )),
1472 blob_sets: registry.register(metric!(
1473 name: "mz_persist_shard_blob_sets",
1474 help: "number of Blob::set calls for this shard",
1475 var_labels: ["shard", "name"],
1476 )),
1477 live_writers: registry.register(metric!(
1478 name: "mz_persist_shard_live_writers",
1479 help: "number of writers that have recently appended updates to this shard",
1480 var_labels: ["shard", "name"],
1481 )),
1482 unconsolidated_snapshot: registry.register(metric!(
1483 name: "mz_persist_shard_unconsolidated_snapshot",
1484 help: "in snapshot_and_read, the number of times consolidating the raw data wasn't enough to produce consolidated output",
1485 var_labels: ["shard", "name"],
1486 )),
1487 backpressure_emitted_bytes: registry.register(metric!(
1488 name: "mz_persist_backpressure_emitted_bytes",
1489 help: "A counter with the number of emitted bytes.",
1490 var_labels: ["shard", "name"],
1491 )),
1492 backpressure_last_backpressured_bytes: registry.register(metric!(
1493 name: "mz_persist_backpressure_last_backpressured_bytes",
1494 help: "The last count of bytes we are waiting to be retired in \
1495 the operator. This cannot be directly compared to \
1496 `retired_bytes`, but CAN indicate that backpressure is happening.",
1497 var_labels: ["shard", "name"],
1498 )),
1499 backpressure_retired_bytes: registry.register(metric!(
1500 name: "mz_persist_backpressure_retired_bytes",
1501 help:"A counter with the number of bytes retired by downstream processing.",
1502 var_labels: ["shard", "name"],
1503 )),
1504 rewrite_part_count: registry.register(metric!(
1505 name: "mz_persist_shard_rewrite_part_count",
1506 help: "count of batch parts with rewrites by shard",
1507 var_labels: ["shard", "name"],
1508 )),
1509 inline_part_count: registry.register(metric!(
1510 name: "mz_persist_shard_inline_part_count",
1511 help: "count of parts inline in shard metadata",
1512 var_labels: ["shard", "name"],
1513 )),
1514 inline_part_bytes: registry.register(metric!(
1515 name: "mz_persist_shard_inline_part_bytes",
1516 help: "total size of parts inline in shard metadata",
1517 var_labels: ["shard", "name"],
1518 )),
1519 compact_batches: registry.register(metric!(
1520 name: "mz_persist_shard_compact_batches",
1521 help: "number of fully compact batches in the shard",
1522 var_labels: ["shard", "name"],
1523 )),
1524 compacting_batches: registry.register(metric!(
1525 name: "mz_persist_shard_compacting_batches",
1526 help: "number of batches in the shard with compactions in progress",
1527 var_labels: ["shard", "name"],
1528 )),
1529 noncompact_batches: registry.register(metric!(
1530 name: "mz_persist_shard_noncompact_batches",
1531 help: "number of batches in the shard that aren't compact and have no ongoing compaction",
1532 var_labels: ["shard", "name"],
1533 )),
1534 schema_registry_version_count: registry.register(metric!(
1535 name: "mz_persist_shard_schema_registry_version_count",
1536 help: "count of versions in the schema registry",
1537 var_labels: ["shard", "name"],
1538 )),
1539 inline_backpressure_count: registry.register(metric!(
1540 name: "mz_persist_shard_inline_backpressure_count",
1541 help: "count of CaA attempts retried because of inline backpressure",
1542 var_labels: ["shard", "name"],
1543 )),
1544 shards,
1545 }
1546 }
1547
1548 pub fn shard(&self, shard_id: &ShardId, name: &str) -> Arc<ShardMetrics> {
1549 let mut shards = self.shards.lock().expect("mutex poisoned");
1550 if let Some(shard) = shards.get(shard_id) {
1551 if let Some(shard) = shard.upgrade() {
1552 return Arc::clone(&shard);
1553 } else {
1554 assert!(shards.remove(shard_id).is_some());
1555 }
1556 }
1557 let shard = Arc::new(ShardMetrics::new(shard_id, name, self));
1558 assert!(
1559 shards
1560 .insert(shard_id.clone(), Arc::downgrade(&shard))
1561 .is_none()
1562 );
1563 shard
1564 }
1565
1566 fn compute<F: FnMut(&ShardMetrics)>(
1567 shards: &Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
1568 mut f: F,
1569 ) {
1570 let mut shards = shards.lock().expect("mutex poisoned");
1571 let mut deleted_shards = Vec::new();
1572 for (shard_id, metrics) in shards.iter() {
1573 if let Some(metrics) = metrics.upgrade() {
1574 f(&metrics);
1575 } else {
1576 deleted_shards.push(shard_id.clone());
1577 }
1578 }
1579 for deleted_shard_id in deleted_shards {
1580 assert!(shards.remove(&deleted_shard_id).is_some());
1581 }
1582 }
1583}
1584
1585#[derive(Debug)]
1586pub struct ShardMetrics {
1587 pub shard_id: ShardId,
1588 pub name: String,
1589 pub since: DeleteOnDropGauge<AtomicI64, Vec<String>>,
1590 pub upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
1591 pub largest_batch_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1592 pub latest_rollup_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1593 pub encoded_diff_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1594 pub hollow_batch_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1595 pub spine_batch_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1596 pub batch_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1597 batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
1598 batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
1599 batch_part_version_map: Mutex<BTreeMap<String, BatchPartVersionMetrics>>,
1600 pub update_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1601 pub rollup_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1602 pub seqnos_held: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1603 pub seqnos_since_last_rollup: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1604 pub gc_seqno_held_parts: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1605 pub gc_live_diffs: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1606 pub usage_current_state_batches_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1607 pub usage_current_state_rollups_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1608 pub usage_referenced_not_current_state_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1609 pub usage_not_leaked_not_referenced_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1610 pub usage_leaked_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1611 pub gc_finished: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1612 pub compaction_applied: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1613 pub cmd_succeeded: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1614 pub pubsub_push_diff_applied: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1615 pub pubsub_push_diff_not_applied_stale: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1616 pub pubsub_push_diff_not_applied_out_of_order: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1617 pub stale_version: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1618 pub blob_gets: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1619 pub blob_sets: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1620 pub live_writers: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1621 pub unconsolidated_snapshot: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1622 pub backpressure_emitted_bytes: Arc<DeleteOnDropCounter<AtomicU64, Vec<String>>>,
1623 pub backpressure_last_backpressured_bytes: Arc<DeleteOnDropGauge<AtomicU64, Vec<String>>>,
1624 pub backpressure_retired_bytes: Arc<DeleteOnDropCounter<AtomicU64, Vec<String>>>,
1625 pub rewrite_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1626 pub inline_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1627 pub inline_part_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1628 pub compact_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1629 pub compacting_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1630 pub noncompact_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1631 pub schema_registry_version_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1632 pub inline_backpressure_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1633}
1634
1635impl ShardMetrics {
1636 pub fn new(shard_id: &ShardId, name: &str, shards_metrics: &ShardsMetrics) -> Self {
1637 let shard = shard_id.to_string();
1638 ShardMetrics {
1639 shard_id: *shard_id,
1640 name: name.to_string(),
1641 since: shards_metrics
1642 .since
1643 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1644 upper: shards_metrics
1645 .upper
1646 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1647 latest_rollup_size: shards_metrics
1648 .encoded_rollup_size
1649 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1650 encoded_diff_size: shards_metrics
1651 .encoded_diff_size
1652 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1653 hollow_batch_count: shards_metrics
1654 .hollow_batch_count
1655 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1656 spine_batch_count: shards_metrics
1657 .spine_batch_count
1658 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1659 batch_part_count: shards_metrics
1660 .batch_part_count
1661 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1662 batch_part_version_count: shards_metrics.batch_part_version_count.clone(),
1663 batch_part_version_bytes: shards_metrics.batch_part_version_bytes.clone(),
1664 batch_part_version_map: Mutex::new(BTreeMap::new()),
1665 update_count: shards_metrics
1666 .update_count
1667 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1668 rollup_count: shards_metrics
1669 .rollup_count
1670 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1671 largest_batch_size: shards_metrics
1672 .largest_batch_size
1673 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1674 seqnos_held: shards_metrics
1675 .seqnos_held
1676 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1677 seqnos_since_last_rollup: shards_metrics
1678 .seqnos_since_last_rollup
1679 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1680 gc_seqno_held_parts: shards_metrics
1681 .gc_seqno_held_parts
1682 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1683 gc_live_diffs: shards_metrics
1684 .gc_live_diffs
1685 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1686 gc_finished: shards_metrics
1687 .gc_finished
1688 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1689 compaction_applied: shards_metrics
1690 .compaction_applied
1691 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1692 cmd_succeeded: shards_metrics
1693 .cmd_succeeded
1694 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1695 usage_current_state_batches_bytes: shards_metrics
1696 .usage_current_state_batches_bytes
1697 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1698 usage_current_state_rollups_bytes: shards_metrics
1699 .usage_current_state_rollups_bytes
1700 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1701 usage_referenced_not_current_state_bytes: shards_metrics
1702 .usage_referenced_not_current_state_bytes
1703 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1704 usage_not_leaked_not_referenced_bytes: shards_metrics
1705 .usage_not_leaked_not_referenced_bytes
1706 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1707 usage_leaked_bytes: shards_metrics
1708 .usage_leaked_bytes
1709 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1710 pubsub_push_diff_applied: shards_metrics
1711 .pubsub_push_diff_applied
1712 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1713 pubsub_push_diff_not_applied_stale: shards_metrics
1714 .pubsub_push_diff_not_applied_stale
1715 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1716 pubsub_push_diff_not_applied_out_of_order: shards_metrics
1717 .pubsub_push_diff_not_applied_out_of_order
1718 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1719 stale_version: shards_metrics
1720 .stale_version
1721 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1722 blob_gets: shards_metrics
1723 .blob_gets
1724 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1725 blob_sets: shards_metrics
1726 .blob_sets
1727 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1728 live_writers: shards_metrics
1729 .live_writers
1730 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1731 unconsolidated_snapshot: shards_metrics
1732 .unconsolidated_snapshot
1733 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1734 backpressure_emitted_bytes: Arc::new(
1735 shards_metrics
1736 .backpressure_emitted_bytes
1737 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1738 ),
1739 backpressure_last_backpressured_bytes: Arc::new(
1740 shards_metrics
1741 .backpressure_last_backpressured_bytes
1742 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1743 ),
1744 backpressure_retired_bytes: Arc::new(
1745 shards_metrics
1746 .backpressure_retired_bytes
1747 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1748 ),
1749 rewrite_part_count: shards_metrics
1750 .rewrite_part_count
1751 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1752 inline_part_count: shards_metrics
1753 .inline_part_count
1754 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1755 inline_part_bytes: shards_metrics
1756 .inline_part_bytes
1757 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1758 compact_batches: shards_metrics
1759 .compact_batches
1760 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1761 compacting_batches: shards_metrics
1762 .compacting_batches
1763 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1764 noncompact_batches: shards_metrics
1765 .noncompact_batches
1766 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1767 schema_registry_version_count: shards_metrics
1768 .schema_registry_version_count
1769 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1770 inline_backpressure_count: shards_metrics
1771 .inline_backpressure_count
1772 .get_delete_on_drop_metric(vec![shard, name.to_string()]),
1773 }
1774 }
1775
1776 pub fn set_since<T: Codec64>(&self, since: &Antichain<T>) {
1777 self.since.set(encode_ts_metric(since))
1778 }
1779
1780 pub fn set_upper<T: Codec64>(&self, upper: &Antichain<T>) {
1781 self.upper.set(encode_ts_metric(upper))
1782 }
1783
1784 pub(crate) fn set_batch_part_versions<'a>(
1785 &self,
1786 batch_parts_by_version: impl Iterator<Item = (&'a str, usize)>,
1787 ) {
1788 let mut map = self
1789 .batch_part_version_map
1790 .lock()
1791 .expect("mutex should not be poisoned");
1792 for x in map.values() {
1799 x.batch_part_version_count.set(0);
1800 x.batch_part_version_bytes.set(0);
1801 }
1802
1803 for (key, bytes) in batch_parts_by_version {
1806 if !map.contains_key(key) {
1807 map.insert(
1808 key.to_owned(),
1809 BatchPartVersionMetrics {
1810 batch_part_version_count: self
1811 .batch_part_version_count
1812 .get_delete_on_drop_metric(vec![
1813 self.shard_id.to_string(),
1814 self.name.clone(),
1815 key.to_owned(),
1816 ]),
1817 batch_part_version_bytes: self
1818 .batch_part_version_bytes
1819 .get_delete_on_drop_metric(vec![
1820 self.shard_id.to_string(),
1821 self.name.clone(),
1822 key.to_owned(),
1823 ]),
1824 },
1825 );
1826 }
1827 let value = map.get(key).expect("inserted above");
1828 value.batch_part_version_count.inc();
1829 value.batch_part_version_bytes.add(u64::cast_from(bytes));
1830 }
1831 }
1832}
1833
1834#[derive(Debug)]
1835pub struct BatchPartVersionMetrics {
1836 pub batch_part_version_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1837 pub batch_part_version_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1838}
1839
1840#[derive(Debug)]
1842pub struct UsageAuditMetrics {
1843 pub blob_batch_part_bytes: UIntGauge,
1845 pub blob_batch_part_count: UIntGauge,
1847 pub blob_rollup_bytes: UIntGauge,
1849 pub blob_rollup_count: UIntGauge,
1851 pub blob_bytes: UIntGauge,
1853 pub blob_count: UIntGauge,
1855 pub step_blob_metadata: Counter,
1857 pub step_state: Counter,
1859 pub step_math: Counter,
1861}
1862
1863impl UsageAuditMetrics {
1864 fn new(registry: &MetricsRegistry) -> Self {
1865 let step_timings: CounterVec = registry.register(metric!(
1866 name: "mz_persist_audit_step_seconds",
1867 help: "time spent on individual steps of audit",
1868 var_labels: ["step"],
1869 ));
1870 UsageAuditMetrics {
1871 blob_batch_part_bytes: registry.register(metric!(
1872 name: "mz_persist_audit_blob_batch_part_bytes",
1873 help: "total size of batch parts in blob",
1874 )),
1875 blob_batch_part_count: registry.register(metric!(
1876 name: "mz_persist_audit_blob_batch_part_count",
1877 help: "count of batch parts in blob",
1878 )),
1879 blob_rollup_bytes: registry.register(metric!(
1880 name: "mz_persist_audit_blob_rollup_bytes",
1881 help: "total size of state rollups stored in blob",
1882 )),
1883 blob_rollup_count: registry.register(metric!(
1884 name: "mz_persist_audit_blob_rollup_count",
1885 help: "count of all state rollups in blob",
1886 )),
1887 blob_bytes: registry.register(metric!(
1888 name: "mz_persist_audit_blob_bytes",
1889 help: "total size of blob",
1890 )),
1891 blob_count: registry.register(metric!(
1892 name: "mz_persist_audit_blob_count",
1893 help: "count of all blobs",
1894 )),
1895 step_blob_metadata: step_timings.with_label_values(&["blob_metadata"]),
1896 step_state: step_timings.with_label_values(&["state"]),
1897 step_math: step_timings.with_label_values(&["math"]),
1898 }
1899 }
1900}
1901
1902#[derive(Debug)]
1905pub enum UpdateDelta {
1906 Negative(u64),
1908 NonNegative(u64),
1910}
1911
1912impl UpdateDelta {
1913 pub fn new(new: usize, old: usize) -> Self {
1916 if new < old {
1917 UpdateDelta::Negative(CastFrom::cast_from(old - new))
1918 } else {
1919 UpdateDelta::NonNegative(CastFrom::cast_from(new - old))
1920 }
1921 }
1922}
1923
1924#[derive(Debug, Clone)]
1927pub struct SinkMetrics {
1928 correction_insertions_total: IntCounter,
1930 correction_deletions_total: IntCounter,
1932 correction_capacity_increases_total: IntCounter,
1934 correction_capacity_decreases_total: IntCounter,
1936 correction_max_per_sink_worker_len_updates: raw::UIntGaugeVec,
1938 correction_max_per_sink_worker_capacity_updates: raw::UIntGaugeVec,
1940}
1941
1942impl SinkMetrics {
1943 fn new(registry: &MetricsRegistry) -> Self {
1944 SinkMetrics {
1945 correction_insertions_total: registry.register(metric!(
1946 name: "mz_persist_sink_correction_insertions_total",
1947 help: "The cumulative insertions observed on the correction buffer across workers and persist sinks.",
1948 )),
1949 correction_deletions_total: registry.register(metric!(
1950 name: "mz_persist_sink_correction_deletions_total",
1951 help: "The cumulative deletions observed on the correction buffer across workers and persist sinks.",
1952 )),
1953 correction_capacity_increases_total: registry.register(metric!(
1954 name: "mz_persist_sink_correction_capacity_increases_total",
1955 help: "The cumulative capacity increases observed on the correction buffer across workers and persist sinks.",
1956 )),
1957 correction_capacity_decreases_total: registry.register(metric!(
1958 name: "mz_persist_sink_correction_capacity_decreases_total",
1959 help: "The cumulative capacity decreases observed on the correction buffer across workers and persist sinks.",
1960 )),
1961 correction_max_per_sink_worker_len_updates: registry.register(metric!(
1962 name: "mz_persist_sink_correction_max_per_sink_worker_len_updates",
1963 help: "The maximum length observed for the correction buffer of any single persist sink per worker.",
1964 var_labels: ["worker_id"],
1965 )),
1966 correction_max_per_sink_worker_capacity_updates: registry.register(metric!(
1967 name: "mz_persist_sink_correction_max_per_sink_worker_capacity_updates",
1968 help: "The maximum capacity observed for the correction buffer of any single persist sink per worker.",
1969 var_labels: ["worker_id"],
1970 )),
1971 }
1972 }
1973
1974 pub fn for_worker(&self, worker_id: usize) -> SinkWorkerMetrics {
1979 let worker = worker_id.to_string();
1980 let correction_max_per_sink_worker_len_updates = self
1981 .correction_max_per_sink_worker_len_updates
1982 .with_label_values(&[&worker]);
1983 let correction_max_per_sink_worker_capacity_updates = self
1984 .correction_max_per_sink_worker_capacity_updates
1985 .with_label_values(&[&worker]);
1986 SinkWorkerMetrics {
1987 correction_max_per_sink_worker_len_updates,
1988 correction_max_per_sink_worker_capacity_updates,
1989 }
1990 }
1991
1992 pub fn report_correction_update_deltas(
1998 &self,
1999 correction_len_delta: UpdateDelta,
2000 correction_cap_delta: UpdateDelta,
2001 ) {
2002 match correction_len_delta {
2004 UpdateDelta::NonNegative(delta) => {
2005 if delta > 0 {
2006 self.correction_insertions_total.inc_by(delta)
2007 }
2008 }
2009 UpdateDelta::Negative(delta) => self.correction_deletions_total.inc_by(delta),
2010 }
2011 match correction_cap_delta {
2013 UpdateDelta::NonNegative(delta) => {
2014 if delta > 0 {
2015 self.correction_capacity_increases_total.inc_by(delta)
2016 }
2017 }
2018 UpdateDelta::Negative(delta) => self.correction_capacity_decreases_total.inc_by(delta),
2019 }
2020 }
2021}
2022
2023#[derive(Clone, Debug)]
2025pub struct SinkWorkerMetrics {
2026 correction_max_per_sink_worker_len_updates: UIntGauge,
2027 correction_max_per_sink_worker_capacity_updates: UIntGauge,
2028}
2029
2030impl SinkWorkerMetrics {
2031 pub fn report_correction_update_totals(&self, correction_len: usize, correction_cap: usize) {
2036 let correction_len = CastFrom::cast_from(correction_len);
2038 if correction_len > self.correction_max_per_sink_worker_len_updates.get() {
2039 self.correction_max_per_sink_worker_len_updates
2040 .set(correction_len);
2041 }
2042 let correction_cap = CastFrom::cast_from(correction_cap);
2043 if correction_cap > self.correction_max_per_sink_worker_capacity_updates.get() {
2044 self.correction_max_per_sink_worker_capacity_updates
2045 .set(correction_cap);
2046 }
2047 }
2048}
2049
2050#[derive(Debug)]
2052pub struct AlertsMetrics {
2053 pub(crate) blob_failures: IntCounter,
2054 pub(crate) consensus_failures: IntCounter,
2055}
2056
2057impl AlertsMetrics {
2058 fn new(registry: &MetricsRegistry) -> Self {
2059 AlertsMetrics {
2060 blob_failures: registry.register(metric!(
2061 name: "mz_persist_blob_failures",
2062 help: "count of all blob operation failures",
2063 const_labels: {"honeycomb" => "import"},
2064 )),
2065 consensus_failures: registry.register(metric!(
2066 name: "mz_persist_consensus_failures",
2067 help: "count of determinate consensus operation failures",
2068 const_labels: {"honeycomb" => "import"},
2069 )),
2070 }
2071 }
2072}
2073
2074#[derive(Debug)]
2076pub struct PubSubServerMetrics {
2077 pub(crate) active_connections: UIntGauge,
2078 pub(crate) broadcasted_diff_count: IntCounter,
2079 pub(crate) broadcasted_diff_bytes: IntCounter,
2080 pub(crate) broadcasted_diff_dropped_channel_full: IntCounter,
2081
2082 pub(crate) push_seconds: Counter,
2083 pub(crate) subscribe_seconds: Counter,
2084 pub(crate) unsubscribe_seconds: Counter,
2085 pub(crate) connection_cleanup_seconds: Counter,
2086
2087 pub(crate) push_call_count: IntCounter,
2088 pub(crate) subscribe_call_count: IntCounter,
2089 pub(crate) unsubscribe_call_count: IntCounter,
2090}
2091
2092impl PubSubServerMetrics {
2093 pub(crate) fn new(registry: &MetricsRegistry) -> Self {
2094 let op_timings: CounterVec = registry.register(metric!(
2095 name: "mz_persist_pubsub_server_operation_seconds",
2096 help: "time spent in pubsub server performing each operation",
2097 var_labels: ["op"],
2098 ));
2099 let call_count: IntCounterVec = registry.register(metric!(
2100 name: "mz_persist_pubsub_server_call_count",
2101 help: "count of each pubsub server message received",
2102 var_labels: ["call"],
2103 ));
2104
2105 Self {
2106 active_connections: registry.register(metric!(
2107 name: "mz_persist_pubsub_server_active_connections",
2108 help: "number of active connections to server",
2109 )),
2110 broadcasted_diff_count: registry.register(metric!(
2111 name: "mz_persist_pubsub_server_broadcasted_diff_count",
2112 help: "count of total broadcast diff messages sent",
2113 )),
2114 broadcasted_diff_bytes: registry.register(metric!(
2115 name: "mz_persist_pubsub_server_broadcasted_diff_bytes",
2116 help: "count of total broadcast diff bytes sent",
2117 )),
2118 broadcasted_diff_dropped_channel_full: registry.register(metric!(
2119 name: "mz_persist_pubsub_server_broadcasted_diff_dropped_channel_full",
2120 help: "count of diffs dropped due to full connection channel",
2121 )),
2122
2123 push_seconds: op_timings.with_label_values(&["push"]),
2124 subscribe_seconds: op_timings.with_label_values(&["subscribe"]),
2125 unsubscribe_seconds: op_timings.with_label_values(&["unsubscribe"]),
2126 connection_cleanup_seconds: op_timings.with_label_values(&["cleanup"]),
2127
2128 push_call_count: call_count.with_label_values(&["push"]),
2129 subscribe_call_count: call_count.with_label_values(&["subscribe"]),
2130 unsubscribe_call_count: call_count.with_label_values(&["unsubscribe"]),
2131 }
2132 }
2133}
2134
2135#[derive(Debug)]
2137pub struct PubSubClientMetrics {
2138 pub sender: PubSubClientSenderMetrics,
2139 pub receiver: PubSubClientReceiverMetrics,
2140 pub grpc_connection: PubSubGrpcClientConnectionMetrics,
2141}
2142
2143impl PubSubClientMetrics {
2144 fn new(registry: &MetricsRegistry) -> Self {
2145 PubSubClientMetrics {
2146 sender: PubSubClientSenderMetrics::new(registry),
2147 receiver: PubSubClientReceiverMetrics::new(registry),
2148 grpc_connection: PubSubGrpcClientConnectionMetrics::new(registry),
2149 }
2150 }
2151}
2152
2153#[derive(Debug)]
2154pub struct PubSubGrpcClientConnectionMetrics {
2155 pub(crate) connected: UIntGauge,
2156 pub(crate) connection_established_count: IntCounter,
2157 pub(crate) connect_call_attempt_count: IntCounter,
2158 pub(crate) broadcast_recv_lagged_count: IntCounter,
2159 pub(crate) grpc_error_count: IntCounter,
2160}
2161
2162impl PubSubGrpcClientConnectionMetrics {
2163 fn new(registry: &MetricsRegistry) -> Self {
2164 Self {
2165 connected: registry.register(metric!(
2166 name: "mz_persist_pubsub_client_grpc_connected",
2167 help: "whether the grpc client is currently connected",
2168 )),
2169 connection_established_count: registry.register(metric!(
2170 name: "mz_persist_pubsub_client_grpc_connection_established_count",
2171 help: "count of grpc connection establishments to pubsub server",
2172 )),
2173 connect_call_attempt_count: registry.register(metric!(
2174 name: "mz_persist_pubsub_client_grpc_connect_call_attempt_count",
2175 help: "count of connection call attempts (including retries) to pubsub server",
2176 )),
2177 broadcast_recv_lagged_count: registry.register(metric!(
2178 name: "mz_persist_pubsub_client_grpc_broadcast_recv_lagged_count",
2179 help: "times a message was missed by broadcast receiver due to lag",
2180 )),
2181 grpc_error_count: registry.register(metric!(
2182 name: "mz_persist_pubsub_client_grpc_error_count",
2183 help: "count of grpc errors received",
2184 )),
2185 }
2186 }
2187}
2188
2189#[derive(Clone, Debug)]
2190pub struct PubSubClientReceiverMetrics {
2191 pub(crate) push_received: IntCounter,
2192 pub(crate) unknown_message_received: IntCounter,
2193 pub(crate) approx_diff_latency_seconds: Histogram,
2194
2195 pub(crate) state_pushed_diff_fast_path: IntCounter,
2196 pub(crate) state_pushed_diff_slow_path_succeeded: IntCounter,
2197 pub(crate) state_pushed_diff_slow_path_failed: IntCounter,
2198}
2199
2200impl PubSubClientReceiverMetrics {
2201 fn new(registry: &MetricsRegistry) -> Self {
2202 let call_received: IntCounterVec = registry.register(metric!(
2203 name: "mz_persist_pubsub_client_call_received",
2204 help: "times a pubsub client call was received",
2205 var_labels: ["call"],
2206 ));
2207
2208 Self {
2209 push_received: call_received.with_label_values(&["push"]),
2210 unknown_message_received: call_received.with_label_values(&["unknown"]),
2211 approx_diff_latency_seconds: registry.register(metric!(
2212 name: "mz_persist_pubsub_client_approx_diff_apply_latency_seconds",
2213 help: "histogram of (approximate) latency between sending a diff and applying it",
2214 buckets: prometheus::exponential_buckets(0.001, 2.0, 13).expect("buckets"),
2215 )),
2216
2217 state_pushed_diff_fast_path: registry.register(metric!(
2218 name: "mz_persist_pubsub_client_receiver_state_push_diff_fast_path",
2219 help: "count fast-path state push_diff calls",
2220 )),
2221 state_pushed_diff_slow_path_succeeded: registry.register(metric!(
2222 name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_succeeded",
2223 help: "count of successful slow-path state push_diff calls",
2224 )),
2225 state_pushed_diff_slow_path_failed: registry.register(metric!(
2226 name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_failed",
2227 help: "count of unsuccessful slow-path state push_diff calls",
2228 )),
2229 }
2230 }
2231}
2232
2233#[derive(Debug)]
2234pub struct PubSubClientSenderMetrics {
2235 pub push: PubSubClientCallMetrics,
2236 pub subscribe: PubSubClientCallMetrics,
2237 pub unsubscribe: PubSubClientCallMetrics,
2238}
2239
2240#[derive(Debug)]
2241pub struct PubSubClientCallMetrics {
2242 pub(crate) succeeded: IntCounter,
2243 pub(crate) bytes_sent: IntCounter,
2244 pub(crate) failed: IntCounter,
2245}
2246
2247impl PubSubClientSenderMetrics {
2248 fn new(registry: &MetricsRegistry) -> Self {
2249 let call_bytes_sent: IntCounterVec = registry.register(metric!(
2250 name: "mz_persist_pubsub_client_call_bytes_sent",
2251 help: "number of bytes sent for a given pubsub client call",
2252 var_labels: ["call"],
2253 ));
2254 let call_succeeded: IntCounterVec = registry.register(metric!(
2255 name: "mz_persist_pubsub_client_call_succeeded",
2256 help: "times a pubsub client call succeeded",
2257 var_labels: ["call"],
2258 ));
2259 let call_failed: IntCounterVec = registry.register(metric!(
2260 name: "mz_persist_pubsub_client_call_failed",
2261 help: "times a pubsub client call failed",
2262 var_labels: ["call"],
2263 ));
2264
2265 Self {
2266 push: PubSubClientCallMetrics {
2267 succeeded: call_succeeded.with_label_values(&["push"]),
2268 failed: call_failed.with_label_values(&["push"]),
2269 bytes_sent: call_bytes_sent.with_label_values(&["push"]),
2270 },
2271 subscribe: PubSubClientCallMetrics {
2272 succeeded: call_succeeded.with_label_values(&["subscribe"]),
2273 failed: call_failed.with_label_values(&["subscribe"]),
2274 bytes_sent: call_bytes_sent.with_label_values(&["subscribe"]),
2275 },
2276 unsubscribe: PubSubClientCallMetrics {
2277 succeeded: call_succeeded.with_label_values(&["unsubscribe"]),
2278 failed: call_failed.with_label_values(&["unsubscribe"]),
2279 bytes_sent: call_bytes_sent.with_label_values(&["unsubscribe"]),
2280 },
2281 }
2282 }
2283}
2284
2285#[derive(Debug)]
2286pub struct LocksMetrics {
2287 pub(crate) applier_read_cacheable: LockMetrics,
2288 pub(crate) applier_read_noncacheable: LockMetrics,
2289 pub(crate) applier_write: LockMetrics,
2290 pub(crate) watch: LockMetrics,
2291}
2292
2293#[derive(Debug, Clone)]
2294pub struct LockMetrics {
2295 pub(crate) acquire_count: IntCounter,
2296 pub(crate) blocking_acquire_count: IntCounter,
2297 pub(crate) blocking_seconds: Counter,
2298}
2299
2300#[derive(Debug)]
2301pub struct WatchMetrics {
2302 pub(crate) listen_woken_via_watch: IntCounter,
2303 pub(crate) listen_woken_via_sleep: IntCounter,
2304 pub(crate) listen_resolved_via_watch: IntCounter,
2305 pub(crate) listen_resolved_via_sleep: IntCounter,
2306 pub(crate) snapshot_woken_via_watch: IntCounter,
2307 pub(crate) snapshot_woken_via_sleep: IntCounter,
2308 pub(crate) notify_sent: IntCounter,
2309 pub(crate) notify_noop: IntCounter,
2310 pub(crate) notify_recv: IntCounter,
2311 pub(crate) notify_lagged: IntCounter,
2312 pub(crate) notify_wait_started: IntCounter,
2313 pub(crate) notify_wait_finished: IntCounter,
2314}
2315
2316impl WatchMetrics {
2317 fn new(registry: &MetricsRegistry) -> Self {
2318 WatchMetrics {
2319 listen_woken_via_watch: registry.register(metric!(
2320 name: "mz_persist_listen_woken_via_watch",
2321 help: "count of listen next batches wakes via watch notify",
2322 )),
2323 listen_woken_via_sleep: registry.register(metric!(
2324 name: "mz_persist_listen_woken_via_sleep",
2325 help: "count of listen next batches wakes via sleep",
2326 )),
2327 listen_resolved_via_watch: registry.register(metric!(
2328 name: "mz_persist_listen_resolved_via_watch",
2329 help: "count of listen next batches resolved via watch notify",
2330 )),
2331 listen_resolved_via_sleep: registry.register(metric!(
2332 name: "mz_persist_listen_resolved_via_sleep",
2333 help: "count of listen next batches resolved via sleep",
2334 )),
2335 snapshot_woken_via_watch: registry.register(metric!(
2336 name: "mz_persist_snapshot_woken_via_watch",
2337 help: "count of snapshot wakes via watch notify",
2338 )),
2339 snapshot_woken_via_sleep: registry.register(metric!(
2340 name: "mz_persist_snapshot_woken_via_sleep",
2341 help: "count of snapshot wakes via sleep",
2342 )),
2343 notify_sent: registry.register(metric!(
2344 name: "mz_persist_watch_notify_sent",
2345 help: "count of watch notifications sent to a non-empty broadcast channel",
2346 )),
2347 notify_noop: registry.register(metric!(
2348 name: "mz_persist_watch_notify_noop",
2349 help: "count of watch notifications sent to an broadcast channel",
2350 )),
2351 notify_recv: registry.register(metric!(
2352 name: "mz_persist_watch_notify_recv",
2353 help: "count of watch notifications received from the broadcast channel",
2354 )),
2355 notify_lagged: registry.register(metric!(
2356 name: "mz_persist_watch_notify_lagged",
2357 help: "count of lagged events in the watch notification broadcast channel",
2358 )),
2359 notify_wait_started: registry.register(metric!(
2360 name: "mz_persist_watch_notify_wait_started",
2361 help: "count of watch wait calls started",
2362 )),
2363 notify_wait_finished: registry.register(metric!(
2364 name: "mz_persist_watch_notify_wait_finished",
2365 help: "count of watch wait calls resolved",
2366 )),
2367 }
2368 }
2369}
2370
2371#[derive(Debug)]
2372pub struct PushdownMetrics {
2373 pub(crate) parts_filtered_count: IntCounter,
2374 pub(crate) parts_filtered_bytes: IntCounter,
2375 pub(crate) parts_fetched_count: IntCounter,
2376 pub(crate) parts_fetched_bytes: IntCounter,
2377 pub(crate) parts_audited_count: IntCounter,
2378 pub(crate) parts_audited_bytes: IntCounter,
2379 pub(crate) parts_inline_count: IntCounter,
2380 pub(crate) parts_inline_bytes: IntCounter,
2381 pub(crate) parts_faked_count: IntCounter,
2382 pub(crate) parts_faked_bytes: IntCounter,
2383 pub(crate) parts_stats_trimmed_count: IntCounter,
2384 pub(crate) parts_stats_trimmed_bytes: IntCounter,
2385 pub(crate) parts_projection_trimmed_bytes: IntCounter,
2386 pub part_stats: PartStatsMetrics,
2387}
2388
2389impl PushdownMetrics {
2390 fn new(registry: &MetricsRegistry) -> Self {
2391 PushdownMetrics {
2392 parts_filtered_count: registry.register(metric!(
2393 name: "mz_persist_pushdown_parts_filtered_count",
2394 help: "count of parts filtered by pushdown",
2395 )),
2396 parts_filtered_bytes: registry.register(metric!(
2397 name: "mz_persist_pushdown_parts_filtered_bytes",
2398 help: "total size of parts filtered by pushdown in bytes",
2399 )),
2400 parts_fetched_count: registry.register(metric!(
2401 name: "mz_persist_pushdown_parts_fetched_count",
2402 help: "count of parts not filtered by pushdown",
2403 )),
2404 parts_fetched_bytes: registry.register(metric!(
2405 name: "mz_persist_pushdown_parts_fetched_bytes",
2406 help: "total size of parts not filtered by pushdown in bytes",
2407 )),
2408 parts_audited_count: registry.register(metric!(
2409 name: "mz_persist_pushdown_parts_audited_count",
2410 help: "count of parts fetched only for pushdown audit",
2411 )),
2412 parts_audited_bytes: registry.register(metric!(
2413 name: "mz_persist_pushdown_parts_audited_bytes",
2414 help: "total size of parts fetched only for pushdown audit",
2415 )),
2416 parts_inline_count: registry.register(metric!(
2417 name: "mz_persist_pushdown_parts_inline_count",
2418 help: "count of parts not fetched because they were inline",
2419 )),
2420 parts_inline_bytes: registry.register(metric!(
2421 name: "mz_persist_pushdown_parts_inline_bytes",
2422 help: "total size of parts not fetched because they were inline",
2423 )),
2424 parts_faked_count: registry.register(metric!(
2425 name: "mz_persist_pushdown_parts_faked_count",
2426 help: "count of parts faked because of aggressive projection pushdown",
2427 )),
2428 parts_faked_bytes: registry.register(metric!(
2429 name: "mz_persist_pushdown_parts_faked_bytes",
2430 help: "total size of parts replaced with fakes by aggressive projection pushdown",
2431 )),
2432 parts_stats_trimmed_count: registry.register(metric!(
2433 name: "mz_persist_pushdown_parts_stats_trimmed_count",
2434 help: "count of trimmed part stats",
2435 )),
2436 parts_stats_trimmed_bytes: registry.register(metric!(
2437 name: "mz_persist_pushdown_parts_stats_trimmed_bytes",
2438 help: "total bytes trimmed from part stats",
2439 )),
2440 parts_projection_trimmed_bytes: registry.register(metric!(
2441 name: "mz_persist_pushdown_parts_projection_trimmed_bytes",
2442 help: "total bytes trimmed from columnar data because of projection pushdown",
2443 )),
2444 part_stats: PartStatsMetrics::new(registry),
2445 }
2446 }
2447}
2448
2449#[derive(Debug)]
2450pub struct ConsolidationMetrics {
2451 pub(crate) parts_fetched: IntCounter,
2452 pub(crate) parts_skipped: IntCounter,
2453 pub(crate) parts_wasted: IntCounter,
2454 pub(crate) wrong_sort: IntCounter,
2455}
2456
2457impl ConsolidationMetrics {
2458 fn new(registry: &MetricsRegistry) -> Self {
2459 ConsolidationMetrics {
2460 parts_fetched: registry.register(metric!(
2461 name: "mz_persist_consolidation_parts_fetched_count",
2462 help: "count of parts that were fetched and used during consolidation",
2463 )),
2464 parts_skipped: registry.register(metric!(
2465 name: "mz_persist_consolidation_parts_skipped_count",
2466 help: "count of parts that were never needed during consolidation",
2467 )),
2468 parts_wasted: registry.register(metric!(
2469 name: "mz_persist_consolidation_parts_wasted_count",
2470 help: "count of parts that were fetched but not needed during consolidation",
2471 )),
2472 wrong_sort: registry.register(metric!(
2473 name: "mz_persist_consolidation_wrong_sort_count",
2474 help: "count of runs that were sorted using the wrong ordering for the current consolidation",
2475 )),
2476 }
2477 }
2478}
2479
2480#[derive(Debug)]
2481pub struct BlobMemCache {
2482 pub(crate) size_blobs: UIntGauge,
2483 pub(crate) size_bytes: UIntGauge,
2484 pub(crate) hits_blobs: IntCounter,
2485 pub(crate) hits_bytes: IntCounter,
2486 pub(crate) evictions: IntCounter,
2487}
2488
2489impl BlobMemCache {
2490 fn new(registry: &MetricsRegistry) -> Self {
2491 BlobMemCache {
2492 size_blobs: registry.register(metric!(
2493 name: "mz_persist_blob_cache_size_blobs",
2494 help: "count of blobs in the cache",
2495 const_labels: {"cache" => "mem"},
2496 )),
2497 size_bytes: registry.register(metric!(
2498 name: "mz_persist_blob_cache_size_bytes",
2499 help: "total size of blobs in the cache",
2500 const_labels: {"cache" => "mem"},
2501 )),
2502 hits_blobs: registry.register(metric!(
2503 name: "mz_persist_blob_cache_hits_blobs",
2504 help: "count of blobs served via cache instead of s3",
2505 const_labels: {"cache" => "mem"},
2506 )),
2507 hits_bytes: registry.register(metric!(
2508 name: "mz_persist_blob_cache_hits_bytes",
2509 help: "total size of blobs served via cache instead of s3",
2510 const_labels: {"cache" => "mem"},
2511 )),
2512 evictions: registry.register(metric!(
2513 name: "mz_persist_blob_cache_evictions",
2514 help: "count of capacity-based cache evictions",
2515 const_labels: {"cache" => "mem"},
2516 )),
2517 }
2518 }
2519}
2520
2521#[derive(Debug)]
2522pub struct SemaphoreMetrics {
2523 cfg: PersistConfig,
2524 registry: MetricsRegistry,
2525 fetch: OnceCell<MetricsSemaphore>,
2526}
2527
2528impl SemaphoreMetrics {
2529 fn new(cfg: PersistConfig, registry: MetricsRegistry) -> Self {
2530 SemaphoreMetrics {
2531 cfg,
2532 registry,
2533 fetch: OnceCell::new(),
2534 }
2535 }
2536
2537 async fn fetch(&self) -> &MetricsSemaphore {
2541 if let Some(x) = self.fetch.get() {
2542 return x;
2544 }
2545 let cfg = self.cfg.clone();
2546 let registry = self.registry.clone();
2547 let init = async move {
2548 let total_permits = match cfg.announce_memory_limit {
2549 Some(mem) if cfg.is_cc_active => {
2552 info!("fetch semaphore awaiting first dyncfg values");
2555 let () = cfg.configs_synced_once().await;
2556 let total_permits = usize::cast_lossy(
2557 f64::cast_lossy(mem) * FETCH_SEMAPHORE_PERMIT_ADJUSTMENT.get(&cfg),
2558 );
2559 info!("fetch_semaphore got first dyncfg values");
2560 total_permits
2561 }
2562 Some(_) | None => Semaphore::MAX_PERMITS,
2563 };
2564 MetricsSemaphore::new(®istry, "fetch", total_permits)
2565 };
2566 self.fetch.get_or_init(|| init).await
2567 }
2568
2569 pub(crate) async fn acquire_fetch_permits(&self, encoded_size_bytes: usize) -> MetricsPermits {
2570 let requested_permits = f64::cast_lossy(encoded_size_bytes);
2573 let requested_permits = requested_permits * FETCH_SEMAPHORE_COST_ADJUSTMENT.get(&self.cfg);
2574 let requested_permits = usize::cast_lossy(requested_permits);
2575 self.fetch().await.acquire_permits(requested_permits).await
2576 }
2577}
2578
2579#[derive(Debug)]
2580pub struct MetricsSemaphore {
2581 name: &'static str,
2582 semaphore: Arc<Semaphore>,
2583 total_permits: usize,
2584 acquire_count: IntCounter,
2585 blocking_count: IntCounter,
2586 blocking_seconds: Counter,
2587 acquired_permits: IntCounter,
2588 released_permits: IntCounter,
2589 _available_permits: ComputedUIntGauge,
2590}
2591
2592impl MetricsSemaphore {
2593 pub fn new(registry: &MetricsRegistry, name: &'static str, total_permits: usize) -> Self {
2594 let total_permits = std::cmp::min(total_permits, Semaphore::MAX_PERMITS);
2595 let semaphore = Arc::new(Semaphore::new(total_permits));
2598 MetricsSemaphore {
2599 name,
2600 total_permits,
2601 acquire_count: registry.register(metric!(
2602 name: "mz_persist_semaphore_acquire_count",
2603 help: "count of acquire calls (not acquired permits count)",
2604 const_labels: {"name" => name},
2605 )),
2606 blocking_count: registry.register(metric!(
2607 name: "mz_persist_semaphore_blocking_count",
2608 help: "count of acquire calls that had to block",
2609 const_labels: {"name" => name},
2610 )),
2611 blocking_seconds: registry.register(metric!(
2612 name: "mz_persist_semaphore_blocking_seconds",
2613 help: "total time spent blocking on permit acquisition",
2614 const_labels: {"name" => name},
2615 )),
2616 acquired_permits: registry.register(metric!(
2617 name: "mz_persist_semaphore_acquired_permits",
2618 help: "total sum of acquired permits",
2619 const_labels: {"name" => name},
2620 )),
2621 released_permits: registry.register(metric!(
2622 name: "mz_persist_semaphore_released_permits",
2623 help: "total sum of released permits",
2624 const_labels: {"name" => name},
2625 )),
2626 _available_permits: registry.register_computed_gauge(
2627 metric!(
2628 name: "mz_persist_semaphore_available_permits",
2629 help: "currently available permits according to the semaphore",
2630 ),
2631 {
2632 let semaphore = Arc::clone(&semaphore);
2633 move || u64::cast_from(semaphore.available_permits())
2634 },
2635 ),
2636 semaphore,
2637 }
2638 }
2639
2640 pub async fn acquire_permits(&self, requested_permits: usize) -> MetricsPermits {
2641 let total_permits = u32::try_from(self.total_permits).unwrap_or(u32::MAX);
2644 let requested_permits = u32::try_from(requested_permits).unwrap_or(u32::MAX);
2645 let requested_permits = std::cmp::min(requested_permits, total_permits);
2646 let wrap = |_permit| {
2647 self.acquired_permits.inc_by(u64::from(requested_permits));
2648 MetricsPermits {
2649 _permit,
2650 released_metric: self.released_permits.clone(),
2651 count: requested_permits,
2652 }
2653 };
2654
2655 self.acquire_count.inc();
2657 match Arc::clone(&self.semaphore).try_acquire_many_owned(requested_permits) {
2658 Ok(x) => return wrap(x),
2659 Err(_) => {}
2660 };
2661
2662 self.blocking_count.inc();
2664 let start = Instant::now();
2665 let ret = Arc::clone(&self.semaphore)
2666 .acquire_many_owned(requested_permits)
2667 .instrument(info_span!("acquire_permits"))
2668 .await;
2669 let elapsed = start.elapsed();
2670 self.blocking_seconds.inc_by(elapsed.as_secs_f64());
2671 debug!(
2672 "acquisition of {} {} permits blocked for {:?}",
2673 self.name, requested_permits, elapsed
2674 );
2675 wrap(ret.expect("semaphore is never closed"))
2676 }
2677}
2678
2679#[derive(Debug)]
2680pub struct MetricsPermits {
2681 _permit: OwnedSemaphorePermit,
2682 released_metric: IntCounter,
2683 count: u32,
2684}
2685
2686impl Drop for MetricsPermits {
2687 fn drop(&mut self) {
2688 self.released_metric.inc_by(u64::from(self.count))
2689 }
2690}
2691
2692#[derive(Debug)]
2693pub struct ExternalOpMetrics {
2694 started: IntCounter,
2695 succeeded: IntCounter,
2696 failed: IntCounter,
2697 bytes: IntCounter,
2698 seconds: Counter,
2699 seconds_histogram: Option<Histogram>,
2700 alerts_metrics: Arc<AlertsMetrics>,
2701}
2702
2703impl ExternalOpMetrics {
2704 async fn run_op<R, F, OpFn, ErrFn>(
2705 &self,
2706 op_fn: OpFn,
2707 on_err_fn: ErrFn,
2708 ) -> Result<R, ExternalError>
2709 where
2710 F: std::future::Future<Output = Result<R, ExternalError>>,
2711 OpFn: FnOnce() -> F,
2712 ErrFn: FnOnce(&AlertsMetrics, &ExternalError),
2713 {
2714 self.started.inc();
2715 let start = Instant::now();
2716 let res = op_fn().await;
2717 let elapsed_seconds = start.elapsed().as_secs_f64();
2718 self.seconds.inc_by(elapsed_seconds);
2719 if let Some(h) = &self.seconds_histogram {
2720 h.observe(elapsed_seconds);
2721 }
2722 match res.as_ref() {
2723 Ok(_) => self.succeeded.inc(),
2724 Err(err) => {
2725 self.failed.inc();
2726 on_err_fn(&self.alerts_metrics, err);
2727 }
2728 };
2729 res
2730 }
2731
2732 fn run_stream<'a, R: 'a, S, OpFn, ErrFn>(
2733 &'a self,
2734 op_fn: OpFn,
2735 mut on_err_fn: ErrFn,
2736 ) -> impl futures::Stream<Item = Result<R, ExternalError>> + 'a
2737 where
2738 S: futures::Stream<Item = Result<R, ExternalError>> + Unpin + 'a,
2739 OpFn: FnOnce() -> S,
2740 ErrFn: FnMut(&AlertsMetrics, &ExternalError) + 'a,
2741 {
2742 self.started.inc();
2743 let start = Instant::now();
2744 let mut stream = op_fn();
2745 stream! {
2746 let mut succeeded = true;
2747 while let Some(res) = stream.next().await {
2748 if let Err(err) = res.as_ref() {
2749 on_err_fn(&self.alerts_metrics, err);
2750 succeeded = false;
2751 }
2752 yield res;
2753 }
2754 if succeeded {
2755 self.succeeded.inc()
2756 } else {
2757 self.failed.inc()
2758 }
2759 let elapsed_seconds = start.elapsed().as_secs_f64();
2760 self.seconds.inc_by(elapsed_seconds);
2761 if let Some(h) = &self.seconds_histogram {
2762 h.observe(elapsed_seconds);
2763 }
2764 }
2765 }
2766}
2767
2768#[derive(Debug)]
2769pub struct BlobMetrics {
2770 set: ExternalOpMetrics,
2771 get: ExternalOpMetrics,
2772 list_keys: ExternalOpMetrics,
2773 delete: ExternalOpMetrics,
2774 restore: ExternalOpMetrics,
2775 delete_noop: IntCounter,
2776 blob_sizes: Histogram,
2777 pub rtt_latency: Gauge,
2778}
2779
2780#[derive(Debug)]
2781pub struct MetricsBlob {
2782 blob: Arc<dyn Blob>,
2783 metrics: Arc<Metrics>,
2784}
2785
2786impl MetricsBlob {
2787 pub fn new(blob: Arc<dyn Blob>, metrics: Arc<Metrics>) -> Self {
2788 MetricsBlob { blob, metrics }
2789 }
2790
2791 fn on_err(alerts_metrics: &AlertsMetrics, _err: &ExternalError) {
2792 alerts_metrics.blob_failures.inc()
2793 }
2794}
2795
2796#[async_trait]
2797impl Blob for MetricsBlob {
2798 #[instrument(name = "blob::get", fields(shard=blob_key_shard_id(key)))]
2799 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
2800 let res = self
2801 .metrics
2802 .blob
2803 .get
2804 .run_op(|| self.blob.get(key), Self::on_err)
2805 .await;
2806 if let Ok(Some(value)) = res.as_ref() {
2807 self.metrics
2808 .blob
2809 .get
2810 .bytes
2811 .inc_by(u64::cast_from(value.len()));
2812 }
2813 res
2814 }
2815
2816 #[instrument(name = "blob::list_keys_and_metadata", fields(shard=blob_key_shard_id(key_prefix)))]
2817 async fn list_keys_and_metadata(
2818 &self,
2819 key_prefix: &str,
2820 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
2821 ) -> Result<(), ExternalError> {
2822 let mut byte_total = 0;
2823 let mut instrumented = |blob_metadata: BlobMetadata| {
2824 byte_total += blob_metadata.key.len();
2827 f(blob_metadata)
2828 };
2829
2830 let res = self
2831 .metrics
2832 .blob
2833 .list_keys
2834 .run_op(
2835 || {
2836 self.blob
2837 .list_keys_and_metadata(key_prefix, &mut instrumented)
2838 },
2839 Self::on_err,
2840 )
2841 .await;
2842
2843 self.metrics
2844 .blob
2845 .list_keys
2846 .bytes
2847 .inc_by(u64::cast_from(byte_total));
2848
2849 res
2850 }
2851
2852 #[instrument(name = "blob::set", fields(shard=blob_key_shard_id(key),size_bytes=value.len()))]
2853 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
2854 let bytes = value.len();
2855 let res = self
2856 .metrics
2857 .blob
2858 .set
2859 .run_op(|| self.blob.set(key, value), Self::on_err)
2860 .await;
2861 if res.is_ok() {
2862 self.metrics.blob.set.bytes.inc_by(u64::cast_from(bytes));
2863 self.metrics.blob.blob_sizes.observe(f64::cast_lossy(bytes));
2864 }
2865 res
2866 }
2867
2868 #[instrument(name = "blob::delete", fields(shard=blob_key_shard_id(key)))]
2869 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
2870 let bytes = self
2871 .metrics
2872 .blob
2873 .delete
2874 .run_op(|| self.blob.delete(key), Self::on_err)
2875 .await?;
2876 if let Some(bytes) = bytes {
2877 self.metrics.blob.delete.bytes.inc_by(u64::cast_from(bytes));
2878 } else {
2879 self.metrics.blob.delete_noop.inc();
2880 }
2881 Ok(bytes)
2882 }
2883
2884 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
2885 self.metrics
2886 .blob
2887 .restore
2888 .run_op(|| self.blob.restore(key), Self::on_err)
2889 .await
2890 }
2891}
2892
2893#[derive(Debug)]
2894pub struct ConsensusMetrics {
2895 list_keys: ExternalOpMetrics,
2896 head: ExternalOpMetrics,
2897 compare_and_set: ExternalOpMetrics,
2898 scan: ExternalOpMetrics,
2899 truncate: ExternalOpMetrics,
2900 truncated_count: IntCounter,
2901 pub rtt_latency: Gauge,
2902}
2903
2904#[derive(Debug)]
2905pub struct MetricsConsensus {
2906 consensus: Arc<dyn Consensus>,
2907 metrics: Arc<Metrics>,
2908}
2909
2910impl MetricsConsensus {
2911 pub fn new(consensus: Arc<dyn Consensus>, metrics: Arc<Metrics>) -> Self {
2912 MetricsConsensus { consensus, metrics }
2913 }
2914
2915 fn on_err(alerts_metrics: &AlertsMetrics, err: &ExternalError) {
2916 if let ExternalError::Indeterminate(_) = err {
2920 alerts_metrics.consensus_failures.inc()
2921 }
2922 }
2923}
2924
2925#[async_trait]
2926impl Consensus for MetricsConsensus {
2927 fn list_keys(&self) -> ResultStream<'_, String> {
2928 Box::pin(
2929 self.metrics
2930 .consensus
2931 .list_keys
2932 .run_stream(|| self.consensus.list_keys(), Self::on_err),
2933 )
2934 }
2935
2936 #[instrument(name = "consensus::head", fields(shard=key))]
2937 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
2938 let res = self
2939 .metrics
2940 .consensus
2941 .head
2942 .run_op(|| self.consensus.head(key), Self::on_err)
2943 .await;
2944 if let Ok(Some(data)) = res.as_ref() {
2945 self.metrics
2946 .consensus
2947 .head
2948 .bytes
2949 .inc_by(u64::cast_from(data.data.len()));
2950 }
2951 res
2952 }
2953
2954 #[instrument(name = "consensus::compare_and_set", fields(shard=key,size_bytes=new.data.len()))]
2955 async fn compare_and_set(
2956 &self,
2957 key: &str,
2958 expected: Option<SeqNo>,
2959 new: VersionedData,
2960 ) -> Result<CaSResult, ExternalError> {
2961 let bytes = new.data.len();
2962 let res = self
2963 .metrics
2964 .consensus
2965 .compare_and_set
2966 .run_op(
2967 || self.consensus.compare_and_set(key, expected, new),
2968 Self::on_err,
2969 )
2970 .await;
2971 match res.as_ref() {
2972 Ok(CaSResult::Committed) => self
2973 .metrics
2974 .consensus
2975 .compare_and_set
2976 .bytes
2977 .inc_by(u64::cast_from(bytes)),
2978 Ok(CaSResult::ExpectationMismatch) | Err(_) => {}
2979 }
2980 res
2981 }
2982
2983 #[instrument(name = "consensus::scan", fields(shard=key))]
2984 async fn scan(
2985 &self,
2986 key: &str,
2987 from: SeqNo,
2988 limit: usize,
2989 ) -> Result<Vec<VersionedData>, ExternalError> {
2990 let res = self
2991 .metrics
2992 .consensus
2993 .scan
2994 .run_op(|| self.consensus.scan(key, from, limit), Self::on_err)
2995 .await;
2996 if let Ok(dataz) = res.as_ref() {
2997 let bytes: usize = dataz.iter().map(|x| x.data.len()).sum();
2998 self.metrics
2999 .consensus
3000 .scan
3001 .bytes
3002 .inc_by(u64::cast_from(bytes));
3003 }
3004 res
3005 }
3006
3007 #[instrument(name = "consensus::truncate", fields(shard=key))]
3008 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
3009 let metrics = &self.metrics.consensus;
3010 let deleted = metrics
3011 .truncate
3012 .run_op(|| self.consensus.truncate(key, seqno), Self::on_err)
3013 .await?;
3014 if let Some(deleted) = deleted {
3015 metrics.truncated_count.inc_by(u64::cast_from(deleted));
3016 }
3017 Ok(deleted)
3018 }
3019}
3020
3021#[derive(Debug, Clone)]
3024pub struct TaskMetrics {
3025 f64_gauges: Vec<(Gauge, fn(&tokio_metrics::TaskMetrics) -> f64)>,
3026 u64_gauges: Vec<(
3027 GenericGauge<AtomicU64>,
3028 fn(&tokio_metrics::TaskMetrics) -> u64,
3029 )>,
3030 monitor: TaskMonitor,
3031}
3032
3033impl TaskMetrics {
3034 pub fn new(name: &str) -> Self {
3035 let monitor = TaskMonitor::new();
3036 Self {
3037 f64_gauges: vec![
3038 (
3039 Gauge::make_collector(metric!(
3040 name: "mz_persist_task_total_idle_duration",
3041 help: "Seconds of time spent idling, ie. waiting for a task to be woken up.",
3042 const_labels: {"name" => name}
3043 )),
3044 |m| m.total_idle_duration.as_secs_f64(),
3045 ),
3046 (
3047 Gauge::make_collector(metric!(
3048 name: "mz_persist_task_total_scheduled_duration",
3049 help: "Seconds of time spent scheduled, ie. ready to poll but not yet polled.",
3050 const_labels: {"name" => name}
3051 )),
3052 |m| m.total_scheduled_duration.as_secs_f64(),
3053 ),
3054 ],
3055 u64_gauges: vec![
3056 (
3057 MakeCollector::make_collector(metric!(
3058 name: "mz_persist_task_total_scheduled_count",
3059 help: "The total number of task schedules. Useful for computing the average scheduled time.",
3060 const_labels: {"name" => name}
3061 )),
3062 |m| m.total_scheduled_count,
3063 ),
3064 (
3065 MakeCollector::make_collector(metric!(
3066 name: "mz_persist_task_total_idled_count",
3067 help: "The total number of task idles. Useful for computing the average idle time.",
3068 const_labels: {"name" => name}
3069 ,
3070 )),
3071 |m| m.total_idled_count,
3072 ),
3073 ],
3074 monitor,
3075 }
3076 }
3077
3078 pub fn instrument_task<F>(&self, task: F) -> tokio_metrics::Instrumented<F> {
3081 TaskMonitor::instrument(&self.monitor, task)
3082 }
3083}
3084
3085impl Collector for TaskMetrics {
3086 fn desc(&self) -> Vec<&Desc> {
3087 let mut descs = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
3088 for (g, _) in &self.f64_gauges {
3089 descs.extend(g.desc());
3090 }
3091 for (g, _) in &self.u64_gauges {
3092 descs.extend(g.desc());
3093 }
3094 descs
3095 }
3096
3097 fn collect(&self) -> Vec<MetricFamily> {
3098 let mut families = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
3099 let metrics = self.monitor.cumulative();
3100 for (g, metrics_fn) in &self.f64_gauges {
3101 g.set(metrics_fn(&metrics));
3102 families.extend(g.collect());
3103 }
3104 for (g, metrics_fn) in &self.u64_gauges {
3105 g.set(metrics_fn(&metrics));
3106 families.extend(g.collect());
3107 }
3108 families
3109 }
3110}
3111
3112#[derive(Debug)]
3113pub struct TasksMetrics {
3114 pub heartbeat_read: TaskMetrics,
3115}
3116
3117impl TasksMetrics {
3118 fn new(registry: &MetricsRegistry) -> Self {
3119 let heartbeat_read = TaskMetrics::new("heartbeat_read");
3120 registry.register_collector(heartbeat_read.clone());
3121 TasksMetrics { heartbeat_read }
3122 }
3123}
3124
3125#[derive(Debug)]
3126pub struct SchemaMetrics {
3127 pub(crate) cache_fetch_state_count: IntCounter,
3128 pub(crate) cache_schema: SchemaCacheMetrics,
3129 pub(crate) cache_migration: SchemaCacheMetrics,
3130 pub(crate) migration_count_same: IntCounter,
3131 pub(crate) migration_count_codec: IntCounter,
3132 pub(crate) migration_count_either: IntCounter,
3133 pub(crate) migration_len_legacy_codec: IntCounter,
3134 pub(crate) migration_len_either_codec: IntCounter,
3135 pub(crate) migration_len_either_arrow: IntCounter,
3136 pub(crate) migration_new_count: IntCounter,
3137 pub(crate) migration_new_seconds: Counter,
3138 pub(crate) migration_migrate_seconds: Counter,
3139}
3140
3141impl SchemaMetrics {
3142 fn new(registry: &MetricsRegistry) -> Self {
3143 let cached: IntCounterVec = registry.register(metric!(
3144 name: "mz_persist_schema_cache_cached_count",
3145 help: "count of schema cache entries served from cache",
3146 var_labels: ["op"],
3147 ));
3148 let computed: IntCounterVec = registry.register(metric!(
3149 name: "mz_persist_schema_cache_computed_count",
3150 help: "count of schema cache entries computed",
3151 var_labels: ["op"],
3152 ));
3153 let unavailable: IntCounterVec = registry.register(metric!(
3154 name: "mz_persist_schema_cache_unavailable_count",
3155 help: "count of schema cache entries unavailable at current state",
3156 var_labels: ["op"],
3157 ));
3158 let added: IntCounterVec = registry.register(metric!(
3159 name: "mz_persist_schema_cache_added_count",
3160 help: "count of schema cache entries added",
3161 var_labels: ["op"],
3162 ));
3163 let dropped: IntCounterVec = registry.register(metric!(
3164 name: "mz_persist_schema_cache_dropped_count",
3165 help: "count of schema cache entries dropped",
3166 var_labels: ["op"],
3167 ));
3168 let cache = |name| SchemaCacheMetrics {
3169 cached_count: cached.with_label_values(&[name]),
3170 computed_count: computed.with_label_values(&[name]),
3171 unavailable_count: unavailable.with_label_values(&[name]),
3172 added_count: added.with_label_values(&[name]),
3173 dropped_count: dropped.with_label_values(&[name]),
3174 };
3175 let migration_count: IntCounterVec = registry.register(metric!(
3176 name: "mz_persist_schema_migration_count",
3177 help: "count of fetch part migrations",
3178 var_labels: ["op"],
3179 ));
3180 let migration_len: IntCounterVec = registry.register(metric!(
3181 name: "mz_persist_schema_migration_len",
3182 help: "count of migrated update records",
3183 var_labels: ["op"],
3184 ));
3185 SchemaMetrics {
3186 cache_fetch_state_count: registry.register(metric!(
3187 name: "mz_persist_schema_cache_fetch_state_count",
3188 help: "count of state fetches by the schema cache",
3189 )),
3190 cache_schema: cache("schema"),
3191 cache_migration: cache("migration"),
3192 migration_count_same: migration_count.with_label_values(&["same"]),
3193 migration_count_codec: migration_count.with_label_values(&["codec"]),
3194 migration_count_either: migration_count.with_label_values(&["either"]),
3195 migration_len_legacy_codec: migration_len.with_label_values(&["legacy_codec"]),
3196 migration_len_either_codec: migration_len.with_label_values(&["either_codec"]),
3197 migration_len_either_arrow: migration_len.with_label_values(&["either_arrow"]),
3198 migration_new_count: registry.register(metric!(
3199 name: "mz_persist_schema_migration_new_count",
3200 help: "count of migrations constructed",
3201 )),
3202 migration_new_seconds: registry.register(metric!(
3203 name: "mz_persist_schema_migration_new_seconds",
3204 help: "seconds spent constructing migration logic",
3205 )),
3206 migration_migrate_seconds: registry.register(metric!(
3207 name: "mz_persist_schema_migration_migrate_seconds",
3208 help: "seconds spent applying migration logic",
3209 )),
3210 }
3211 }
3212}
3213
3214#[derive(Debug, Clone)]
3215pub struct SchemaCacheMetrics {
3216 pub(crate) cached_count: IntCounter,
3217 pub(crate) computed_count: IntCounter,
3218 pub(crate) unavailable_count: IntCounter,
3219 pub(crate) added_count: IntCounter,
3220 pub(crate) dropped_count: IntCounter,
3221}
3222
3223#[derive(Debug)]
3224pub struct InlineMetrics {
3225 pub(crate) part_commit_count: IntCounter,
3226 pub(crate) part_commit_bytes: IntCounter,
3227 pub(crate) backpressure: BatchWriteMetrics,
3228}
3229
3230impl InlineMetrics {
3231 fn new(registry: &MetricsRegistry) -> Self {
3232 InlineMetrics {
3233 part_commit_count: registry.register(metric!(
3234 name: "mz_persist_inline_part_commit_count",
3235 help: "count of inline parts committed to state",
3236 )),
3237 part_commit_bytes: registry.register(metric!(
3238 name: "mz_persist_inline_part_commit_bytes",
3239 help: "total size of of inline parts committed to state",
3240 )),
3241 backpressure: BatchWriteMetrics::new(registry, "inline_backpressure"),
3242 }
3243 }
3244}
3245
3246fn blob_key_shard_id(key: &str) -> Option<String> {
3247 let (shard_id, _) = BlobKey::parse_ids(key).ok()?;
3248 Some(shard_id.to_string())
3249}
3250
3251pub fn encode_ts_metric<T: Codec64>(ts: &Antichain<T>) -> i64 {
3253 match ts.elements().first() {
3263 Some(ts) => i64::from_le_bytes(Codec64::encode(ts)),
3264 None => i64::MAX,
3265 }
3266}