1use async_stream::stream;
13use mz_persist_types::stats::PartStatsMetrics;
14use std::collections::BTreeMap;
15use std::sync::{Arc, Mutex, Weak};
16use std::time::{Duration, Instant};
17use tokio::sync::{OnceCell, OwnedSemaphorePermit, Semaphore};
18
19use async_trait::async_trait;
20use bytes::Bytes;
21use futures_util::StreamExt;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::{CastFrom, CastLossy};
24use mz_ore::instrument;
25use mz_ore::metric;
26use mz_ore::metrics::{
27 ComputedGauge, ComputedIntGauge, ComputedUIntGauge, Counter, DeleteOnDropCounter,
28 DeleteOnDropGauge, IntCounter, MakeCollector, MetricVecExt, MetricsRegistry, UIntGauge,
29 UIntGaugeVec, raw,
30};
31use mz_ore::stats::histogram_seconds_buckets;
32use mz_persist::location::{
33 Blob, BlobMetadata, CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData,
34};
35use mz_persist::metrics::{ColumnarMetrics, S3BlobMetrics};
36use mz_persist::retry::RetryStream;
37use mz_persist_types::Codec64;
38use mz_postgres_client::metrics::PostgresClientMetrics;
39use prometheus::core::{AtomicI64, AtomicU64, Collector, Desc, GenericGauge};
40use prometheus::proto::MetricFamily;
41use prometheus::{CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounterVec};
42use timely::progress::Antichain;
43use tokio_metrics::TaskMonitor;
44use tracing::{Instrument, debug, info, info_span};
45
46use crate::fetch::{FETCH_SEMAPHORE_COST_ADJUSTMENT, FETCH_SEMAPHORE_PERMIT_ADJUSTMENT};
47use crate::internal::paths::BlobKey;
48use crate::{PersistConfig, ShardId};
49
50pub struct Metrics {
55 _vecs: MetricsVecs,
56 _uptime: ComputedGauge,
57
58 pub blob: BlobMetrics,
60 pub consensus: ConsensusMetrics,
62 pub cmds: CmdsMetrics,
64 pub retries: RetriesMetrics,
66 pub user: BatchWriteMetrics,
69 pub read: BatchPartReadMetrics,
71 pub compaction: CompactionMetrics,
73 pub gc: GcMetrics,
75 pub lease: LeaseMetrics,
77 pub codecs: CodecsMetrics,
79 pub state: StateMetrics,
81 pub shards: ShardsMetrics,
83 pub audit: UsageAuditMetrics,
85 pub locks: LocksMetrics,
87 pub watch: WatchMetrics,
89 pub pubsub_client: PubSubClientMetrics,
91 pub pushdown: PushdownMetrics,
93 pub consolidation: ConsolidationMetrics,
95 pub blob_cache_mem: BlobMemCache,
97 pub tasks: TasksMetrics,
99 pub columnar: ColumnarMetrics,
101 pub schema: SchemaMetrics,
103 pub inline: InlineMetrics,
105 pub(crate) semaphore: SemaphoreMetrics,
107
108 pub sink: SinkMetrics,
110
111 pub s3_blob: S3BlobMetrics,
113 pub postgres_consensus: PostgresClientMetrics,
115
116 #[allow(dead_code)]
117 pub(crate) registry: MetricsRegistry,
118}
119
120impl std::fmt::Debug for Metrics {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 f.debug_struct("Metrics").finish_non_exhaustive()
123 }
124}
125
126impl Metrics {
127 pub fn new(cfg: &PersistConfig, registry: &MetricsRegistry) -> Self {
129 let vecs = MetricsVecs::new(registry);
130 let start = Instant::now();
131 let uptime = registry.register_computed_gauge(
132 metric!(
133 name: "mz_persist_metadata_seconds",
134 help: "server uptime, labels are build metadata",
135 const_labels: {
136 "version" => cfg.build_version,
137 "build_type" => if cfg!(release) { "release" } else { "debug" }
138 },
139 ),
140 move || start.elapsed().as_secs_f64(),
141 );
142 let s3_blob = S3BlobMetrics::new(registry);
143 let columnar = ColumnarMetrics::new(
144 registry,
145 &s3_blob.lgbytes,
146 Arc::clone(&cfg.configs),
147 cfg.is_cc_active,
148 );
149 Metrics {
150 blob: vecs.blob_metrics(),
151 consensus: vecs.consensus_metrics(),
152 cmds: vecs.cmds_metrics(registry),
153 retries: vecs.retries_metrics(),
154 codecs: vecs.codecs_metrics(),
155 user: BatchWriteMetrics::new(registry, "user"),
156 read: vecs.batch_part_read_metrics(),
157 compaction: CompactionMetrics::new(registry),
158 gc: GcMetrics::new(registry),
159 lease: LeaseMetrics::new(registry),
160 state: StateMetrics::new(registry),
161 shards: ShardsMetrics::new(registry),
162 audit: UsageAuditMetrics::new(registry),
163 locks: vecs.locks_metrics(),
164 watch: WatchMetrics::new(registry),
165 pubsub_client: PubSubClientMetrics::new(registry),
166 pushdown: PushdownMetrics::new(registry),
167 consolidation: ConsolidationMetrics::new(registry),
168 blob_cache_mem: BlobMemCache::new(registry),
169 tasks: TasksMetrics::new(registry),
170 columnar,
171 schema: SchemaMetrics::new(registry),
172 inline: InlineMetrics::new(registry),
173 semaphore: SemaphoreMetrics::new(cfg.clone(), registry.clone()),
174 sink: SinkMetrics::new(registry),
175 s3_blob,
176 postgres_consensus: PostgresClientMetrics::new(registry, "mz_persist"),
177 _vecs: vecs,
178 _uptime: uptime,
179 registry: registry.clone(),
180 }
181 }
182
183 pub fn write_amplification(&self) -> f64 {
188 let total_written = self.blob.set.bytes.get();
191 let user_written = self.user.goodbytes.get();
192 #[allow(clippy::as_conversions)]
193 {
194 total_written as f64 / user_written as f64
195 }
196 }
197}
198
199#[derive(Debug)]
200struct MetricsVecs {
201 cmd_started: IntCounterVec,
202 cmd_cas_mismatch: IntCounterVec,
203 cmd_succeeded: IntCounterVec,
204 cmd_failed: IntCounterVec,
205 cmd_seconds: CounterVec,
206
207 external_op_started: IntCounterVec,
208 external_op_succeeded: IntCounterVec,
209 external_op_failed: IntCounterVec,
210 external_op_bytes: IntCounterVec,
211 external_op_seconds: CounterVec,
212 external_consensus_truncated_count: IntCounter,
213 external_blob_delete_noop_count: IntCounter,
214 external_blob_sizes: Histogram,
215 external_rtt_latency: GaugeVec,
216 external_op_latency: HistogramVec,
217
218 retry_started: IntCounterVec,
219 retry_finished: IntCounterVec,
220 retry_retries: IntCounterVec,
221 retry_sleep_seconds: CounterVec,
222
223 encode_count: IntCounterVec,
224 encode_seconds: CounterVec,
225 decode_count: IntCounterVec,
226 decode_seconds: CounterVec,
227
228 read_part_bytes: IntCounterVec,
229 read_part_goodbytes: IntCounterVec,
230 read_part_count: IntCounterVec,
231 read_part_seconds: CounterVec,
232 read_ts_rewrite: IntCounterVec,
233
234 lock_acquire_count: IntCounterVec,
235 lock_blocking_acquire_count: IntCounterVec,
236 lock_blocking_seconds: CounterVec,
237
238 alerts_metrics: Arc<AlertsMetrics>,
240}
241
242impl MetricsVecs {
243 fn new(registry: &MetricsRegistry) -> Self {
244 MetricsVecs {
245 cmd_started: registry.register(metric!(
246 name: "mz_persist_cmd_started_count",
247 help: "count of commands started",
248 var_labels: ["cmd"],
249 )),
250 cmd_cas_mismatch: registry.register(metric!(
251 name: "mz_persist_cmd_cas_mismatch_count",
252 help: "count of command retries from CaS mismatch",
253 var_labels: ["cmd"],
254 )),
255 cmd_succeeded: registry.register(metric!(
256 name: "mz_persist_cmd_succeeded_count",
257 help: "count of commands succeeded",
258 var_labels: ["cmd"],
259 )),
260 cmd_failed: registry.register(metric!(
261 name: "mz_persist_cmd_failed_count",
262 help: "count of commands failed",
263 var_labels: ["cmd"],
264 )),
265 cmd_seconds: registry.register(metric!(
266 name: "mz_persist_cmd_seconds",
267 help: "time spent applying commands",
268 var_labels: ["cmd"],
269 )),
270
271 external_op_started: registry.register(metric!(
272 name: "mz_persist_external_started_count",
273 help: "count of external service calls started",
274 var_labels: ["op"],
275 )),
276 external_op_succeeded: registry.register(metric!(
277 name: "mz_persist_external_succeeded_count",
278 help: "count of external service calls succeeded",
279 var_labels: ["op"],
280 )),
281 external_op_failed: registry.register(metric!(
282 name: "mz_persist_external_failed_count",
283 help: "count of external service calls failed",
284 var_labels: ["op"],
285 )),
286 external_op_bytes: registry.register(metric!(
287 name: "mz_persist_external_bytes_count",
288 help: "total size represented by external service calls",
289 var_labels: ["op"],
290 )),
291 external_op_seconds: registry.register(metric!(
292 name: "mz_persist_external_seconds",
293 help: "time spent in external service calls",
294 var_labels: ["op"],
295 )),
296 external_consensus_truncated_count: registry.register(metric!(
297 name: "mz_persist_external_consensus_truncated_count",
298 help: "count of versions deleted by consensus truncate calls",
299 )),
300 external_blob_delete_noop_count: registry.register(metric!(
301 name: "mz_persist_external_blob_delete_noop_count",
302 help: "count of blob delete calls that deleted a non-existent key",
303 )),
304 external_blob_sizes: registry.register(metric!(
305 name: "mz_persist_external_blob_sizes",
306 help: "histogram of blob sizes at put time",
307 buckets: mz_ore::stats::HISTOGRAM_BYTE_BUCKETS.to_vec(),
308 )),
309 external_rtt_latency: registry.register(metric!(
310 name: "mz_persist_external_rtt_latency",
311 help: "roundtrip-time to external service as seen by this process",
312 var_labels: ["external"],
313 )),
314 external_op_latency: registry.register(metric!(
315 name: "mz_persist_external_op_latency",
316 help: "rountrip latency observed by individual performance-critical operations",
317 var_labels: ["op"],
318 buckets: histogram_seconds_buckets(0.000_500, 32.0),
321 )),
322
323 retry_started: registry.register(metric!(
324 name: "mz_persist_retry_started_count",
325 help: "count of retry loops started",
326 var_labels: ["op"],
327 )),
328 retry_finished: registry.register(metric!(
329 name: "mz_persist_retry_finished_count",
330 help: "count of retry loops finished",
331 var_labels: ["op"],
332 )),
333 retry_retries: registry.register(metric!(
334 name: "mz_persist_retry_retries_count",
335 help: "count of total attempts by retry loops",
336 var_labels: ["op"],
337 )),
338 retry_sleep_seconds: registry.register(metric!(
339 name: "mz_persist_retry_sleep_seconds",
340 help: "time spent in retry loop backoff",
341 var_labels: ["op"],
342 )),
343
344 encode_count: registry.register(metric!(
345 name: "mz_persist_encode_count",
346 help: "count of op encodes",
347 var_labels: ["op"],
348 )),
349 encode_seconds: registry.register(metric!(
350 name: "mz_persist_encode_seconds",
351 help: "time spent in op encodes",
352 var_labels: ["op"],
353 )),
354 decode_count: registry.register(metric!(
355 name: "mz_persist_decode_count",
356 help: "count of op decodes",
357 var_labels: ["op"],
358 )),
359 decode_seconds: registry.register(metric!(
360 name: "mz_persist_decode_seconds",
361 help: "time spent in op decodes",
362 var_labels: ["op"],
363 )),
364
365 read_part_bytes: registry.register(metric!(
366 name: "mz_persist_read_batch_part_bytes",
367 help: "total encoded size of batch parts read",
368 var_labels: ["op"],
369 )),
370 read_part_goodbytes: registry.register(metric!(
371 name: "mz_persist_read_batch_part_goodbytes",
372 help: "total logical size of batch parts read",
373 var_labels: ["op"],
374 )),
375 read_part_count: registry.register(metric!(
376 name: "mz_persist_read_batch_part_count",
377 help: "count of batch parts read",
378 var_labels: ["op"],
379 )),
380 read_part_seconds: registry.register(metric!(
381 name: "mz_persist_read_batch_part_seconds",
382 help: "time spent reading batch parts",
383 var_labels: ["op"],
384 )),
385 read_ts_rewrite: registry.register(metric!(
386 name: "mz_persist_read_ts_rewite",
387 help: "count of updates read with rewritten ts",
388 var_labels: ["op"],
389 )),
390
391 lock_acquire_count: registry.register(metric!(
392 name: "mz_persist_lock_acquire_count",
393 help: "count of locks acquired",
394 var_labels: ["op"],
395 )),
396 lock_blocking_acquire_count: registry.register(metric!(
397 name: "mz_persist_lock_blocking_acquire_count",
398 help: "count of locks acquired that required blocking",
399 var_labels: ["op"],
400 )),
401 lock_blocking_seconds: registry.register(metric!(
402 name: "mz_persist_lock_blocking_seconds",
403 help: "time spent blocked for a lock",
404 var_labels: ["op"],
405 )),
406
407 alerts_metrics: Arc::new(AlertsMetrics::new(registry)),
408 }
409 }
410
411 fn cmds_metrics(&self, registry: &MetricsRegistry) -> CmdsMetrics {
412 CmdsMetrics {
413 init_state: self.cmd_metrics("init_state"),
414 add_rollup: self.cmd_metrics("add_rollup"),
415 remove_rollups: self.cmd_metrics("remove_rollups"),
416 register: self.cmd_metrics("register"),
417 compare_and_append: self.cmd_metrics("compare_and_append"),
418 compare_and_append_noop: registry.register(metric!(
419 name: "mz_persist_cmd_compare_and_append_noop",
420 help: "count of compare_and_append retries that were discoverd to have already committed",
421 )),
422 compare_and_downgrade_since: self.cmd_metrics("compare_and_downgrade_since"),
423 downgrade_since: self.cmd_metrics("downgrade_since"),
424 heartbeat_reader: self.cmd_metrics("heartbeat_reader"),
425 expire_reader: self.cmd_metrics("expire_reader"),
426 expire_writer: self.cmd_metrics("expire_writer"),
427 merge_res: self.cmd_metrics("merge_res"),
428 become_tombstone: self.cmd_metrics("become_tombstone"),
429 compare_and_evolve_schema: self.cmd_metrics("compare_and_evolve_schema"),
430 spine_exert: self.cmd_metrics("spine_exert"),
431 fetch_upper_count: registry.register(metric!(
432 name: "mz_persist_cmd_fetch_upper_count",
433 help: "count of fetch_upper calls",
434 ))
435 }
436 }
437
438 fn cmd_metrics(&self, cmd: &str) -> CmdMetrics {
439 CmdMetrics {
440 name: cmd.to_owned(),
441 started: self.cmd_started.with_label_values(&[cmd]),
442 succeeded: self.cmd_succeeded.with_label_values(&[cmd]),
443 cas_mismatch: self.cmd_cas_mismatch.with_label_values(&[cmd]),
444 failed: self.cmd_failed.with_label_values(&[cmd]),
445 seconds: self.cmd_seconds.with_label_values(&[cmd]),
446 }
447 }
448
449 fn retries_metrics(&self) -> RetriesMetrics {
450 RetriesMetrics {
451 determinate: RetryDeterminate {
452 apply_unbatched_cmd_cas: self.retry_metrics("apply_unbatched_cmd::cas"),
453 },
454 external: RetryExternal {
455 batch_delete: Arc::new(self.retry_metrics("batch::delete")),
456 batch_set: self.retry_metrics("batch::set"),
457 blob_open: self.retry_metrics("blob::open"),
458 compaction_noop_delete: Arc::new(self.retry_metrics("compaction_noop::delete")),
459 consensus_open: self.retry_metrics("consensus::open"),
460 fetch_batch_get: self.retry_metrics("fetch_batch::get"),
461 fetch_state_scan: self.retry_metrics("fetch_state::scan"),
462 gc_truncate: self.retry_metrics("gc::truncate"),
463 maybe_init_cas: self.retry_metrics("maybe_init::cas"),
464 rollup_delete: self.retry_metrics("rollup::delete"),
465 rollup_get: self.retry_metrics("rollup::get"),
466 rollup_set: self.retry_metrics("rollup::set"),
467 hollow_run_get: self.retry_metrics("hollow_run::get"),
468 hollow_run_set: self.retry_metrics("hollow_run::set"),
469 storage_usage_shard_size: self.retry_metrics("storage_usage::shard_size"),
470 },
471 compare_and_append_idempotent: self.retry_metrics("compare_and_append_idempotent"),
472 fetch_latest_state: self.retry_metrics("fetch_latest_state"),
473 fetch_live_states: self.retry_metrics("fetch_live_states"),
474 idempotent_cmd: self.retry_metrics("idempotent_cmd"),
475 next_listen_batch: self.retry_metrics("next_listen_batch"),
476 snapshot: self.retry_metrics("snapshot"),
477 }
478 }
479
480 fn retry_metrics(&self, name: &str) -> RetryMetrics {
481 RetryMetrics {
482 name: name.to_owned(),
483 started: self.retry_started.with_label_values(&[name]),
484 finished: self.retry_finished.with_label_values(&[name]),
485 retries: self.retry_retries.with_label_values(&[name]),
486 sleep_seconds: self.retry_sleep_seconds.with_label_values(&[name]),
487 }
488 }
489
490 fn codecs_metrics(&self) -> CodecsMetrics {
491 CodecsMetrics {
492 state: self.codec_metrics("state"),
493 state_diff: self.codec_metrics("state_diff"),
494 batch: self.codec_metrics("batch"),
495 key: self.codec_metrics("key"),
496 val: self.codec_metrics("val"),
497 }
498 }
499
500 fn codec_metrics(&self, op: &str) -> CodecMetrics {
501 CodecMetrics {
502 encode_count: self.encode_count.with_label_values(&[op]),
503 encode_seconds: self.encode_seconds.with_label_values(&[op]),
504 decode_count: self.decode_count.with_label_values(&[op]),
505 decode_seconds: self.decode_seconds.with_label_values(&[op]),
506 }
507 }
508
509 fn blob_metrics(&self) -> BlobMetrics {
510 BlobMetrics {
511 set: self.external_op_metrics("blob_set", true),
512 get: self.external_op_metrics("blob_get", true),
513 list_keys: self.external_op_metrics("blob_list_keys", false),
514 delete: self.external_op_metrics("blob_delete", false),
515 restore: self.external_op_metrics("restore", false),
516 delete_noop: self.external_blob_delete_noop_count.clone(),
517 blob_sizes: self.external_blob_sizes.clone(),
518 rtt_latency: self.external_rtt_latency.with_label_values(&["blob"]),
519 }
520 }
521
522 fn consensus_metrics(&self) -> ConsensusMetrics {
523 ConsensusMetrics {
524 list_keys: self.external_op_metrics("consensus_list_keys", false),
525 head: self.external_op_metrics("consensus_head", false),
526 compare_and_set: self.external_op_metrics("consensus_cas", true),
527 scan: self.external_op_metrics("consensus_scan", false),
528 truncate: self.external_op_metrics("consensus_truncate", false),
529 truncated_count: self.external_consensus_truncated_count.clone(),
530 rtt_latency: self.external_rtt_latency.with_label_values(&["consensus"]),
531 }
532 }
533
534 fn external_op_metrics(&self, op: &str, latency_histogram: bool) -> ExternalOpMetrics {
535 ExternalOpMetrics {
536 started: self.external_op_started.with_label_values(&[op]),
537 succeeded: self.external_op_succeeded.with_label_values(&[op]),
538 failed: self.external_op_failed.with_label_values(&[op]),
539 bytes: self.external_op_bytes.with_label_values(&[op]),
540 seconds: self.external_op_seconds.with_label_values(&[op]),
541 seconds_histogram: if latency_histogram {
542 Some(self.external_op_latency.with_label_values(&[op]))
543 } else {
544 None
545 },
546 alerts_metrics: Arc::clone(&self.alerts_metrics),
547 }
548 }
549
550 fn batch_part_read_metrics(&self) -> BatchPartReadMetrics {
551 BatchPartReadMetrics {
552 listen: self.read_metrics("listen"),
553 snapshot: self.read_metrics("snapshot"),
554 batch_fetcher: self.read_metrics("batch_fetcher"),
555 compaction: self.read_metrics("compaction"),
556 unindexed: self.read_metrics("unindexed"),
557 }
558 }
559
560 fn read_metrics(&self, op: &str) -> ReadMetrics {
561 ReadMetrics {
562 part_bytes: self.read_part_bytes.with_label_values(&[op]),
563 part_goodbytes: self.read_part_goodbytes.with_label_values(&[op]),
564 part_count: self.read_part_count.with_label_values(&[op]),
565 seconds: self.read_part_seconds.with_label_values(&[op]),
566 ts_rewrite: self.read_ts_rewrite.with_label_values(&[op]),
567 }
568 }
569
570 fn locks_metrics(&self) -> LocksMetrics {
571 LocksMetrics {
572 applier_read_cacheable: self.lock_metrics("applier_read_cacheable"),
573 applier_read_noncacheable: self.lock_metrics("applier_read_noncacheable"),
574 applier_write: self.lock_metrics("applier_write"),
575 watch: self.lock_metrics("watch"),
576 }
577 }
578
579 fn lock_metrics(&self, op: &str) -> LockMetrics {
580 LockMetrics {
581 acquire_count: self.lock_acquire_count.with_label_values(&[op]),
582 blocking_acquire_count: self.lock_blocking_acquire_count.with_label_values(&[op]),
583 blocking_seconds: self.lock_blocking_seconds.with_label_values(&[op]),
584 }
585 }
586}
587
588#[derive(Debug)]
589pub struct CmdMetrics {
590 pub(crate) name: String,
591 pub(crate) started: IntCounter,
592 pub(crate) cas_mismatch: IntCounter,
593 pub(crate) succeeded: IntCounter,
594 pub(crate) failed: IntCounter,
595 pub(crate) seconds: Counter,
596}
597
598impl CmdMetrics {
599 pub async fn run_cmd<R, E, F, CmdFn>(
600 &self,
601 shard_metrics: &ShardMetrics,
602 cmd_fn: CmdFn,
603 ) -> Result<R, E>
604 where
605 F: std::future::Future<Output = Result<R, E>>,
606 CmdFn: FnOnce() -> F,
607 {
608 self.started.inc();
609 let start = Instant::now();
610 let res = cmd_fn().await;
611 self.seconds.inc_by(start.elapsed().as_secs_f64());
612 match res.as_ref() {
613 Ok(_) => {
614 self.succeeded.inc();
615 shard_metrics.cmd_succeeded.inc();
616 }
617 Err(_) => self.failed.inc(),
618 };
619 res
620 }
621}
622
623#[derive(Debug)]
624pub struct CmdsMetrics {
625 pub(crate) init_state: CmdMetrics,
626 pub(crate) add_rollup: CmdMetrics,
627 pub(crate) remove_rollups: CmdMetrics,
628 pub(crate) register: CmdMetrics,
629 pub(crate) compare_and_append: CmdMetrics,
630 pub(crate) compare_and_append_noop: IntCounter,
631 pub(crate) compare_and_downgrade_since: CmdMetrics,
632 pub(crate) downgrade_since: CmdMetrics,
633 pub(crate) heartbeat_reader: CmdMetrics,
634 pub(crate) expire_reader: CmdMetrics,
635 pub(crate) expire_writer: CmdMetrics,
636 pub(crate) merge_res: CmdMetrics,
637 pub(crate) become_tombstone: CmdMetrics,
638 pub(crate) compare_and_evolve_schema: CmdMetrics,
639 pub(crate) spine_exert: CmdMetrics,
640 pub(crate) fetch_upper_count: IntCounter,
641}
642
643#[derive(Debug)]
644pub struct RetryMetrics {
645 pub(crate) name: String,
646 pub(crate) started: IntCounter,
647 pub(crate) finished: IntCounter,
648 pub(crate) retries: IntCounter,
649 pub(crate) sleep_seconds: Counter,
650}
651
652impl RetryMetrics {
653 pub(crate) fn stream(&self, retry: RetryStream) -> MetricsRetryStream {
654 MetricsRetryStream::new(retry, self)
655 }
656}
657
658#[derive(Debug)]
659pub struct RetryDeterminate {
660 pub(crate) apply_unbatched_cmd_cas: RetryMetrics,
661}
662
663#[derive(Debug)]
664pub struct RetryExternal {
665 pub(crate) batch_delete: Arc<RetryMetrics>,
666 pub(crate) batch_set: RetryMetrics,
667 pub(crate) blob_open: RetryMetrics,
668 pub(crate) compaction_noop_delete: Arc<RetryMetrics>,
669 pub(crate) consensus_open: RetryMetrics,
670 pub(crate) fetch_batch_get: RetryMetrics,
671 pub(crate) fetch_state_scan: RetryMetrics,
672 pub(crate) gc_truncate: RetryMetrics,
673 pub(crate) maybe_init_cas: RetryMetrics,
674 pub(crate) rollup_delete: RetryMetrics,
675 pub(crate) rollup_get: RetryMetrics,
676 pub(crate) rollup_set: RetryMetrics,
677 pub(crate) hollow_run_get: RetryMetrics,
678 pub(crate) hollow_run_set: RetryMetrics,
679 pub(crate) storage_usage_shard_size: RetryMetrics,
680}
681
682#[derive(Debug)]
683pub struct RetriesMetrics {
684 pub(crate) determinate: RetryDeterminate,
685 pub(crate) external: RetryExternal,
686
687 pub(crate) compare_and_append_idempotent: RetryMetrics,
688 pub(crate) fetch_latest_state: RetryMetrics,
689 pub(crate) fetch_live_states: RetryMetrics,
690 pub(crate) idempotent_cmd: RetryMetrics,
691 pub(crate) next_listen_batch: RetryMetrics,
692 pub(crate) snapshot: RetryMetrics,
693}
694
695#[derive(Debug)]
696pub struct BatchPartReadMetrics {
697 pub(crate) listen: ReadMetrics,
698 pub(crate) snapshot: ReadMetrics,
699 pub(crate) batch_fetcher: ReadMetrics,
700 pub(crate) compaction: ReadMetrics,
701 pub(crate) unindexed: ReadMetrics,
702}
703
704#[derive(Debug, Clone)]
705pub struct ReadMetrics {
706 pub(crate) part_bytes: IntCounter,
707 pub(crate) part_goodbytes: IntCounter,
708 pub(crate) part_count: IntCounter,
709 pub(crate) seconds: Counter,
710 pub(crate) ts_rewrite: IntCounter,
711}
712
713#[derive(Debug, Clone)]
716pub struct BatchWriteMetrics {
717 pub(crate) bytes: IntCounter,
718 pub(crate) goodbytes: IntCounter,
719 pub(crate) seconds: Counter,
720 pub(crate) write_stalls: IntCounter,
721 pub(crate) key_lower_too_big: IntCounter,
722
723 pub(crate) unordered: IntCounter,
724 pub(crate) codec_order: IntCounter,
725 pub(crate) structured_order: IntCounter,
726 _order_counts: IntCounterVec,
727
728 pub(crate) step_stats: Counter,
729 pub(crate) step_part_writing: Counter,
730 pub(crate) step_inline: Counter,
731}
732
733impl BatchWriteMetrics {
734 fn new(registry: &MetricsRegistry, name: &str) -> Self {
735 let order_counts: IntCounterVec = registry.register(metric!(
736 name: format!("mz_persist_{}_write_batch_order", name),
737 help: "count of batches by the data ordering",
738 var_labels: ["order"],
739 ));
740 let unordered = order_counts.with_label_values(&["unordered"]);
741 let codec_order = order_counts.with_label_values(&["codec"]);
742 let structured_order = order_counts.with_label_values(&["structured"]);
743
744 BatchWriteMetrics {
745 bytes: registry.register(metric!(
746 name: format!("mz_persist_{}_bytes", name),
747 help: format!("total encoded size of {} batches written", name),
748 )),
749 goodbytes: registry.register(metric!(
750 name: format!("mz_persist_{}_goodbytes", name),
751 help: format!("total logical size of {} batches written", name),
752 )),
753 seconds: registry.register(metric!(
754 name: format!("mz_persist_{}_write_batch_part_seconds", name),
755 help: format!("time spent writing {} batches", name),
756 )),
757 write_stalls: registry.register(metric!(
758 name: format!("mz_persist_{}_write_stall_count", name),
759 help: format!(
760 "count of {} writes stalling to await max outstanding reqs",
761 name
762 ),
763 )),
764 key_lower_too_big: registry.register(metric!(
765 name: format!("mz_persist_{}_key_lower_too_big", name),
766 help: format!(
767 "count of {} writes that were unable to write a key lower, because the size threshold was too low",
768 name
769 ),
770 )),
771 unordered,
772 codec_order,
773 structured_order,
774 _order_counts: order_counts,
775 step_stats: registry.register(metric!(
776 name: format!("mz_persist_{}_step_stats", name),
777 help: format!("time spent computing {} update stats", name),
778 )),
779 step_part_writing: registry.register(metric!(
780 name: format!("mz_persist_{}_step_part_writing", name),
781 help: format!("blocking time spent writing parts for {} updates", name),
782 )),
783 step_inline: registry.register(metric!(
784 name: format!("mz_persist_{}_step_inline", name),
785 help: format!("time spent encoding {} inline batches", name)
786 )),
787 }
788 }
789}
790
791#[derive(Debug)]
792pub struct CompactionMetrics {
793 pub(crate) requested: IntCounter,
794 pub(crate) dropped: IntCounter,
795 pub(crate) disabled: IntCounter,
796 pub(crate) skipped: IntCounter,
797 pub(crate) started: IntCounter,
798 pub(crate) applied: IntCounter,
799 pub(crate) timed_out: IntCounter,
800 pub(crate) failed: IntCounter,
801 pub(crate) noop: IntCounter,
802 pub(crate) seconds: Counter,
803 pub(crate) concurrency_waits: IntCounter,
804 pub(crate) queued_seconds: Counter,
805 pub(crate) memory_violations: IntCounter,
806 pub(crate) runs_compacted: IntCounter,
807 pub(crate) chunks_compacted: IntCounter,
808 pub(crate) not_all_prefetched: IntCounter,
809 pub(crate) parts_prefetched: IntCounter,
810 pub(crate) parts_waited: IntCounter,
811 pub(crate) fast_path_eligible: IntCounter,
812 pub(crate) admin_count: IntCounter,
813
814 pub(crate) applied_exact_match: IntCounter,
815 pub(crate) applied_subset_match: IntCounter,
816 pub(crate) not_applied_too_many_updates: IntCounter,
817
818 pub(crate) batch: BatchWriteMetrics,
819 pub(crate) steps: CompactionStepTimings,
820 pub(crate) schema_selection: CompactionSchemaSelection,
821
822 pub(crate) _steps_vec: CounterVec,
823}
824
825impl CompactionMetrics {
826 fn new(registry: &MetricsRegistry) -> Self {
827 let step_timings: CounterVec = registry.register(metric!(
828 name: "mz_persist_compaction_step_seconds",
829 help: "time spent on individual steps of compaction",
830 var_labels: ["step"],
831 ));
832 let schema_selection: CounterVec = registry.register(metric!(
833 name: "mz_persist_compaction_schema_selection",
834 help: "count of compactions and how we did schema selection",
835 var_labels: ["selection"],
836 ));
837
838 CompactionMetrics {
839 requested: registry.register(metric!(
840 name: "mz_persist_compaction_requested",
841 help: "count of total compaction requests",
842 )),
843 dropped: registry.register(metric!(
844 name: "mz_persist_compaction_dropped",
845 help: "count of total compaction requests dropped due to a full queue",
846 )),
847 disabled: registry.register(metric!(
848 name: "mz_persist_compaction_disabled",
849 help: "count of total compaction requests dropped because compaction was disabled",
850 )),
851 skipped: registry.register(metric!(
852 name: "mz_persist_compaction_skipped",
853 help: "count of compactions skipped due to heuristics",
854 )),
855 started: registry.register(metric!(
856 name: "mz_persist_compaction_started",
857 help: "count of compactions started",
858 )),
859 failed: registry.register(metric!(
860 name: "mz_persist_compaction_failed",
861 help: "count of compactions failed",
862 )),
863 applied: registry.register(metric!(
864 name: "mz_persist_compaction_applied",
865 help: "count of compactions applied to state",
866 )),
867 timed_out: registry.register(metric!(
868 name: "mz_persist_compaction_timed_out",
869 help: "count of compactions that timed out",
870 )),
871 noop: registry.register(metric!(
872 name: "mz_persist_compaction_noop",
873 help: "count of compactions discarded (obsolete)",
874 )),
875 seconds: registry.register(metric!(
876 name: "mz_persist_compaction_seconds",
877 help: "time spent in compaction",
878 )),
879 concurrency_waits: registry.register(metric!(
880 name: "mz_persist_compaction_concurrency_waits",
881 help: "count of compaction requests that ever blocked due to concurrency limit",
882 )),
883 queued_seconds: registry.register(metric!(
884 name: "mz_persist_compaction_queued_seconds",
885 help: "time that compaction requests spent queued",
886 )),
887 memory_violations: registry.register(metric!(
888 name: "mz_persist_compaction_memory_violations",
889 help: "count of compaction memory requirement violations",
890 )),
891 runs_compacted: registry.register(metric!(
892 name: "mz_persist_compaction_runs_compacted",
893 help: "count of runs compacted",
894 )),
895 chunks_compacted: registry.register(metric!(
896 name: "mz_persist_compaction_chunks_compacted",
897 help: "count of run chunks compacted",
898 )),
899 not_all_prefetched: registry.register(metric!(
900 name: "mz_persist_compaction_not_all_prefetched",
901 help: "count of compactions where not all inputs were prefetched",
902 )),
903 parts_prefetched: registry.register(metric!(
904 name: "mz_persist_compaction_parts_prefetched",
905 help: "count of compaction parts completely prefetched by the time they're needed",
906 )),
907 parts_waited: registry.register(metric!(
908 name: "mz_persist_compaction_parts_waited",
909 help: "count of compaction parts that had to be waited on",
910 )),
911 fast_path_eligible: registry.register(metric!(
912 name: "mz_persist_compaction_fast_path_eligible",
913 help: "count of compaction requests that could have used the fast-path optimization",
914 )),
915 admin_count: registry.register(metric!(
916 name: "mz_persist_compaction_admin_count",
917 help: "count of compaction requests that were performed by admin tooling",
918 )),
919 applied_exact_match: registry.register(metric!(
920 name: "mz_persist_compaction_applied_exact_match",
921 help: "count of merge results that exactly replaced a SpineBatch",
922 )),
923 applied_subset_match: registry.register(metric!(
924 name: "mz_persist_compaction_applied_subset_match",
925 help: "count of merge results that replaced a subset of a SpineBatch",
926 )),
927 not_applied_too_many_updates: registry.register(metric!(
928 name: "mz_persist_compaction_not_applied_too_many_updates",
929 help: "count of merge results that did not apply due to too many updates",
930 )),
931 batch: BatchWriteMetrics::new(registry, "compaction"),
932 steps: CompactionStepTimings::new(step_timings.clone()),
933 schema_selection: CompactionSchemaSelection::new(schema_selection.clone()),
934 _steps_vec: step_timings,
935 }
936 }
937}
938
939#[derive(Debug)]
940pub struct CompactionStepTimings {
941 pub(crate) part_fetch_seconds: Counter,
942 pub(crate) heap_population_seconds: Counter,
943}
944
945impl CompactionStepTimings {
946 fn new(step_timings: CounterVec) -> CompactionStepTimings {
947 CompactionStepTimings {
948 part_fetch_seconds: step_timings.with_label_values(&["part_fetch"]),
949 heap_population_seconds: step_timings.with_label_values(&["heap_population"]),
950 }
951 }
952}
953
954#[derive(Debug)]
955pub struct CompactionSchemaSelection {
956 pub(crate) recent_schema: Counter,
957 pub(crate) no_schema: Counter,
958}
959
960impl CompactionSchemaSelection {
961 fn new(schema_selection: CounterVec) -> CompactionSchemaSelection {
962 CompactionSchemaSelection {
963 recent_schema: schema_selection.with_label_values(&["recent"]),
964 no_schema: schema_selection.with_label_values(&["none"]),
965 }
966 }
967}
968
969#[derive(Debug)]
970pub struct GcMetrics {
971 pub(crate) noop: IntCounter,
972 pub(crate) started: IntCounter,
973 pub(crate) finished: IntCounter,
974 pub(crate) merged: IntCounter,
975 pub(crate) seconds: Counter,
976 pub(crate) steps: GcStepTimings,
977}
978
979#[derive(Debug)]
980pub struct GcStepTimings {
981 pub(crate) find_removable_rollups: Counter,
982 pub(crate) fetch_seconds: Counter,
983 pub(crate) find_deletable_blobs_seconds: Counter,
984 pub(crate) delete_rollup_seconds: Counter,
985 pub(crate) delete_batch_part_seconds: Counter,
986 pub(crate) truncate_diff_seconds: Counter,
987 pub(crate) remove_rollups_from_state: Counter,
988 pub(crate) post_gc_calculations_seconds: Counter,
989}
990
991impl GcStepTimings {
992 fn new(step_timings: CounterVec) -> Self {
993 Self {
994 find_removable_rollups: step_timings.with_label_values(&["find_removable_rollups"]),
995 fetch_seconds: step_timings.with_label_values(&["fetch"]),
996 find_deletable_blobs_seconds: step_timings.with_label_values(&["find_deletable_blobs"]),
997 delete_rollup_seconds: step_timings.with_label_values(&["delete_rollup"]),
998 delete_batch_part_seconds: step_timings.with_label_values(&["delete_batch_part"]),
999 truncate_diff_seconds: step_timings.with_label_values(&["truncate_diff"]),
1000 remove_rollups_from_state: step_timings
1001 .with_label_values(&["remove_rollups_from_state"]),
1002 post_gc_calculations_seconds: step_timings.with_label_values(&["post_gc_calculations"]),
1003 }
1004 }
1005}
1006
1007impl GcMetrics {
1008 fn new(registry: &MetricsRegistry) -> Self {
1009 let step_timings: CounterVec = registry.register(metric!(
1010 name: "mz_persist_gc_step_seconds",
1011 help: "time spent on individual steps of gc",
1012 var_labels: ["step"],
1013 ));
1014 GcMetrics {
1015 noop: registry.register(metric!(
1016 name: "mz_persist_gc_noop",
1017 help: "count of garbage collections skipped because they were already done",
1018 )),
1019 started: registry.register(metric!(
1020 name: "mz_persist_gc_started",
1021 help: "count of garbage collections started",
1022 )),
1023 finished: registry.register(metric!(
1024 name: "mz_persist_gc_finished",
1025 help: "count of garbage collections finished",
1026 )),
1027 merged: registry.register(metric!(
1028 name: "mz_persist_gc_merged_reqs",
1029 help: "count of garbage collection requests merged",
1030 )),
1031 seconds: registry.register(metric!(
1032 name: "mz_persist_gc_seconds",
1033 help: "time spent in garbage collections",
1034 )),
1035 steps: GcStepTimings::new(step_timings),
1036 }
1037 }
1038}
1039
1040#[derive(Debug)]
1041pub struct LeaseMetrics {
1042 pub(crate) timeout_read: IntCounter,
1043 pub(crate) dropped_part: IntCounter,
1044}
1045
1046impl LeaseMetrics {
1047 fn new(registry: &MetricsRegistry) -> Self {
1048 LeaseMetrics {
1049 timeout_read: registry.register(metric!(
1050 name: "mz_persist_lease_timeout_read",
1051 help: "count of readers whose lease timed out",
1052 )),
1053 dropped_part: registry.register(metric!(
1054 name: "mz_persist_lease_dropped_part",
1055 help: "count of LeasedBatchParts that were dropped without being politely returned",
1056 )),
1057 }
1058 }
1059}
1060
1061struct IncOnDrop(IntCounter);
1062
1063impl Drop for IncOnDrop {
1064 fn drop(&mut self) {
1065 self.0.inc()
1066 }
1067}
1068
1069pub struct MetricsRetryStream {
1070 retry: RetryStream,
1071 pub(crate) retries: IntCounter,
1072 sleep_seconds: Counter,
1073 _finished: IncOnDrop,
1074}
1075
1076impl MetricsRetryStream {
1077 pub fn new(retry: RetryStream, metrics: &RetryMetrics) -> Self {
1078 metrics.started.inc();
1079 MetricsRetryStream {
1080 retry,
1081 retries: metrics.retries.clone(),
1082 sleep_seconds: metrics.sleep_seconds.clone(),
1083 _finished: IncOnDrop(metrics.finished.clone()),
1084 }
1085 }
1086
1087 pub fn attempt(&self) -> usize {
1089 self.retry.attempt()
1090 }
1091
1092 pub fn next_sleep(&self) -> Duration {
1094 self.retry.next_sleep()
1095 }
1096
1097 pub async fn sleep(self) -> Self {
1102 self.retries.inc();
1103 self.sleep_seconds
1104 .inc_by(self.retry.next_sleep().as_secs_f64());
1105 let retry = self.retry.sleep().await;
1106 MetricsRetryStream {
1107 retry,
1108 retries: self.retries,
1109 sleep_seconds: self.sleep_seconds,
1110 _finished: self._finished,
1111 }
1112 }
1113}
1114
1115#[derive(Debug)]
1116pub struct CodecsMetrics {
1117 pub(crate) state: CodecMetrics,
1118 pub(crate) state_diff: CodecMetrics,
1119 pub(crate) batch: CodecMetrics,
1120 pub(crate) key: CodecMetrics,
1121 pub(crate) val: CodecMetrics,
1122 }
1125
1126#[derive(Debug)]
1127pub struct CodecMetrics {
1128 pub(crate) encode_count: IntCounter,
1129 pub(crate) encode_seconds: Counter,
1130 pub(crate) decode_count: IntCounter,
1131 pub(crate) decode_seconds: Counter,
1132}
1133
1134impl CodecMetrics {
1135 pub(crate) fn encode<R, F: FnOnce() -> R>(&self, f: F) -> R {
1136 let now = Instant::now();
1137 let r = f();
1138 self.encode_count.inc();
1139 self.encode_seconds.inc_by(now.elapsed().as_secs_f64());
1140 r
1141 }
1142
1143 pub(crate) fn decode<R, F: FnOnce() -> R>(&self, f: F) -> R {
1144 let now = Instant::now();
1145 let r = f();
1146 self.decode_count.inc();
1147 self.decode_seconds.inc_by(now.elapsed().as_secs_f64());
1148 r
1149 }
1150}
1151
1152#[derive(Debug)]
1153pub struct StateMetrics {
1154 pub(crate) apply_spine_fast_path: IntCounter,
1155 pub(crate) apply_spine_slow_path: IntCounter,
1156 pub(crate) apply_spine_slow_path_lenient: IntCounter,
1157 pub(crate) apply_spine_slow_path_lenient_adjustment: IntCounter,
1158 pub(crate) apply_spine_slow_path_with_reconstruction: IntCounter,
1159 pub(crate) apply_spine_flattened: IntCounter,
1160 pub(crate) update_state_noop_path: IntCounter,
1161 pub(crate) update_state_empty_path: IntCounter,
1162 pub(crate) update_state_fast_path: IntCounter,
1163 pub(crate) update_state_slow_path: IntCounter,
1164 pub(crate) rollup_at_seqno_migration: IntCounter,
1165 pub(crate) fetch_recent_live_diffs_fast_path: IntCounter,
1166 pub(crate) fetch_recent_live_diffs_slow_path: IntCounter,
1167 pub(crate) writer_added: IntCounter,
1168 pub(crate) writer_removed: IntCounter,
1169 pub(crate) force_apply_hostname: IntCounter,
1170 pub(crate) rollup_write_success: IntCounter,
1171 pub(crate) rollup_write_noop_latest: IntCounter,
1172 pub(crate) rollup_write_noop_truncated: IntCounter,
1173}
1174
1175impl StateMetrics {
1176 pub(crate) fn new(registry: &MetricsRegistry) -> Self {
1177 let rollup_write_noop: IntCounterVec = registry.register(metric!(
1178 name: "mz_persist_state_rollup_write_noop",
1179 help: "count of no-op rollup writes",
1180 var_labels: ["reason"],
1181 ));
1182
1183 StateMetrics {
1184 apply_spine_fast_path: registry.register(metric!(
1185 name: "mz_persist_state_apply_spine_fast_path",
1186 help: "count of spine diff applications that hit the fast path",
1187 )),
1188 apply_spine_slow_path: registry.register(metric!(
1189 name: "mz_persist_state_apply_spine_slow_path",
1190 help: "count of spine diff applications that hit the slow path",
1191 )),
1192 apply_spine_slow_path_lenient: registry.register(metric!(
1193 name: "mz_persist_state_apply_spine_slow_path_lenient",
1194 help: "count of spine diff applications that hit the lenient compaction apply path",
1195 )),
1196 apply_spine_slow_path_lenient_adjustment: registry.register(metric!(
1197 name: "mz_persist_state_apply_spine_slow_path_lenient_adjustment",
1198 help: "count of adjustments made by the lenient compaction apply path",
1199 )),
1200 apply_spine_slow_path_with_reconstruction: registry.register(metric!(
1201 name: "mz_persist_state_apply_spine_slow_path_with_reconstruction",
1202 help: "count of spine diff applications that hit the slow path with extra spine reconstruction step",
1203 )),
1204 apply_spine_flattened: registry.register(metric!(
1205 name: "mz_persist_state_apply_spine_flattened",
1206 help: "count of spine diff applications that flatten the trace",
1207 )),
1208 update_state_noop_path: registry.register(metric!(
1209 name: "mz_persist_state_update_state_noop_path",
1210 help: "count of state update applications that no-oped due to shared state",
1211 )),
1212 update_state_empty_path: registry.register(metric!(
1213 name: "mz_persist_state_update_state_empty_path",
1214 help: "count of state update applications that found no new updates",
1215 )),
1216 update_state_fast_path: registry.register(metric!(
1217 name: "mz_persist_state_update_state_fast_path",
1218 help: "count of state update applications that hit the fast path",
1219 )),
1220 update_state_slow_path: registry.register(metric!(
1221 name: "mz_persist_state_update_state_slow_path",
1222 help: "count of state update applications that hit the slow path",
1223 )),
1224 rollup_at_seqno_migration: registry.register(metric!(
1225 name: "mz_persist_state_rollup_at_seqno_migration",
1226 help: "count of fetch_rollup_at_seqno calls that only worked because of the migration",
1227 )),
1228 fetch_recent_live_diffs_fast_path: registry.register(metric!(
1229 name: "mz_persist_state_fetch_recent_live_diffs_fast_path",
1230 help: "count of fetch_recent_live_diffs that hit the fast path",
1231 )),
1232 fetch_recent_live_diffs_slow_path: registry.register(metric!(
1233 name: "mz_persist_state_fetch_recent_live_diffs_slow_path",
1234 help: "count of fetch_recent_live_diffs that hit the slow path",
1235 )),
1236 writer_added: registry.register(metric!(
1237 name: "mz_persist_state_writer_added",
1238 help: "count of writers added to the state",
1239 )),
1240 writer_removed: registry.register(metric!(
1241 name: "mz_persist_state_writer_removed",
1242 help: "count of writers removed from the state",
1243 )),
1244 force_apply_hostname: registry.register(metric!(
1245 name: "mz_persist_state_force_applied_hostname",
1246 help: "count of when hostname diffs needed to be force applied",
1247 )),
1248 rollup_write_success: registry.register(metric!(
1249 name: "mz_persist_state_rollup_write_success",
1250 help: "count of rollups written successful (may not be linked in to state)",
1251 )),
1252 rollup_write_noop_latest: rollup_write_noop.with_label_values(&["latest"]),
1253 rollup_write_noop_truncated: rollup_write_noop.with_label_values(&["truncated"]),
1254 }
1255 }
1256}
1257
1258#[derive(Debug)]
1259pub struct ShardsMetrics {
1260 _count: ComputedIntGauge,
1264 since: mz_ore::metrics::IntGaugeVec,
1265 upper: mz_ore::metrics::IntGaugeVec,
1266 encoded_rollup_size: mz_ore::metrics::UIntGaugeVec,
1267 encoded_diff_size: mz_ore::metrics::IntCounterVec,
1268 hollow_batch_count: mz_ore::metrics::UIntGaugeVec,
1269 spine_batch_count: mz_ore::metrics::UIntGaugeVec,
1270 batch_part_count: mz_ore::metrics::UIntGaugeVec,
1271 batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
1272 batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
1273 update_count: mz_ore::metrics::UIntGaugeVec,
1274 rollup_count: mz_ore::metrics::UIntGaugeVec,
1275 largest_batch_size: mz_ore::metrics::UIntGaugeVec,
1276 seqnos_held: mz_ore::metrics::UIntGaugeVec,
1277 seqnos_since_last_rollup: mz_ore::metrics::UIntGaugeVec,
1278 gc_seqno_held_parts: mz_ore::metrics::UIntGaugeVec,
1279 gc_live_diffs: mz_ore::metrics::UIntGaugeVec,
1280 gc_finished: mz_ore::metrics::IntCounterVec,
1281 compaction_applied: mz_ore::metrics::IntCounterVec,
1282 cmd_succeeded: mz_ore::metrics::IntCounterVec,
1283 usage_current_state_batches_bytes: mz_ore::metrics::UIntGaugeVec,
1284 usage_current_state_rollups_bytes: mz_ore::metrics::UIntGaugeVec,
1285 usage_referenced_not_current_state_bytes: mz_ore::metrics::UIntGaugeVec,
1286 usage_not_leaked_not_referenced_bytes: mz_ore::metrics::UIntGaugeVec,
1287 usage_leaked_bytes: mz_ore::metrics::UIntGaugeVec,
1288 pubsub_push_diff_applied: mz_ore::metrics::IntCounterVec,
1289 pubsub_push_diff_not_applied_stale: mz_ore::metrics::IntCounterVec,
1290 pubsub_push_diff_not_applied_out_of_order: mz_ore::metrics::IntCounterVec,
1291 stale_version: mz_ore::metrics::UIntGaugeVec,
1292 blob_gets: mz_ore::metrics::IntCounterVec,
1293 blob_sets: mz_ore::metrics::IntCounterVec,
1294 live_writers: mz_ore::metrics::UIntGaugeVec,
1295 unconsolidated_snapshot: mz_ore::metrics::IntCounterVec,
1296 backpressure_emitted_bytes: IntCounterVec,
1297 backpressure_last_backpressured_bytes: UIntGaugeVec,
1298 backpressure_retired_bytes: IntCounterVec,
1299 rewrite_part_count: UIntGaugeVec,
1300 inline_part_count: UIntGaugeVec,
1301 inline_part_bytes: UIntGaugeVec,
1302 compact_batches: UIntGaugeVec,
1303 compacting_batches: UIntGaugeVec,
1304 noncompact_batches: UIntGaugeVec,
1305 schema_registry_version_count: UIntGaugeVec,
1306 inline_backpressure_count: IntCounterVec,
1307 shards: Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
1311}
1312
1313impl ShardsMetrics {
1314 fn new(registry: &MetricsRegistry) -> Self {
1315 let shards = Arc::new(Mutex::new(BTreeMap::new()));
1316 let shards_count = Arc::clone(&shards);
1317 ShardsMetrics {
1318 _count: registry.register_computed_gauge(
1319 metric!(
1320 name: "mz_persist_shard_count",
1321 help: "count of all active shards on this process",
1322 ),
1323 move || {
1324 let mut ret = 0;
1325 Self::compute(&shards_count, |_m| ret += 1);
1326 ret
1327 },
1328 ),
1329 since: registry.register(metric!(
1330 name: "mz_persist_shard_since",
1331 help: "since by shard",
1332 var_labels: ["shard", "name"],
1333 )),
1334 upper: registry.register(metric!(
1335 name: "mz_persist_shard_upper",
1336 help: "upper by shard",
1337 var_labels: ["shard", "name"],
1338 )),
1339 encoded_rollup_size: registry.register(metric!(
1340 name: "mz_persist_shard_rollup_size_bytes",
1341 help: "total encoded rollup size by shard",
1342 var_labels: ["shard", "name"],
1343 )),
1344 encoded_diff_size: registry.register(metric!(
1345 name: "mz_persist_shard_diff_size_bytes",
1346 help: "total encoded diff size by shard",
1347 var_labels: ["shard", "name"],
1348 )),
1349 hollow_batch_count: registry.register(metric!(
1350 name: "mz_persist_shard_hollow_batch_count",
1351 help: "count of hollow batches by shard",
1352 var_labels: ["shard", "name"],
1353 )),
1354 spine_batch_count: registry.register(metric!(
1355 name: "mz_persist_shard_spine_batch_count",
1356 help: "count of spine batches by shard",
1357 var_labels: ["shard", "name"],
1358 )),
1359 batch_part_count: registry.register(metric!(
1360 name: "mz_persist_shard_batch_part_count",
1361 help: "count of batch parts by shard",
1362 var_labels: ["shard", "name"],
1363 )),
1364 batch_part_version_count: registry.register(metric!(
1365 name: "mz_persist_shard_batch_part_version_count",
1366 help: "count of batch parts by shard and version",
1367 var_labels: ["shard", "name", "version"],
1368 )),
1369 batch_part_version_bytes: registry.register(metric!(
1370 name: "mz_persist_shard_batch_part_version_bytes",
1371 help: "total bytes in batch parts by shard and version",
1372 var_labels: ["shard", "name", "version"],
1373 )),
1374 update_count: registry.register(metric!(
1375 name: "mz_persist_shard_update_count",
1376 help: "count of updates by shard",
1377 var_labels: ["shard", "name"],
1378 )),
1379 rollup_count: registry.register(metric!(
1380 name: "mz_persist_shard_rollup_count",
1381 help: "count of rollups by shard",
1382 var_labels: ["shard", "name"],
1383 )),
1384 largest_batch_size: registry.register(metric!(
1385 name: "mz_persist_shard_largest_batch_size",
1386 help: "largest encoded batch size by shard",
1387 var_labels: ["shard", "name"],
1388 )),
1389 seqnos_held: registry.register(metric!(
1390 name: "mz_persist_shard_seqnos_held",
1391 help: "maximum count of gc-ineligible states by shard",
1392 var_labels: ["shard", "name"],
1393 )),
1394 seqnos_since_last_rollup: registry.register(metric!(
1395 name: "mz_persist_shard_seqnos_since_last_rollup",
1396 help: "count of seqnos since last rollup",
1397 var_labels: ["shard", "name"],
1398 )),
1399 gc_seqno_held_parts: registry.register(metric!(
1400 name: "mz_persist_shard_gc_seqno_held_parts",
1401 help: "count of parts referenced by some live state but not the current state (ie. parts kept only to satisfy seqno holds) at GC time",
1402 var_labels: ["shard", "name"],
1403 )),
1404 gc_live_diffs: registry.register(metric!(
1405 name: "mz_persist_shard_gc_live_diffs",
1406 help: "the number of diffs (or, alternatively, the number of seqnos) present in consensus state at GC time",
1407 var_labels: ["shard", "name"],
1408 )),
1409 gc_finished: registry.register(metric!(
1410 name: "mz_persist_shard_gc_finished",
1411 help: "count of garbage collections finished by shard",
1412 var_labels: ["shard", "name"],
1413 )),
1414 compaction_applied: registry.register(metric!(
1415 name: "mz_persist_shard_compaction_applied",
1416 help: "count of compactions applied to state by shard",
1417 var_labels: ["shard", "name"],
1418 )),
1419 cmd_succeeded: registry.register(metric!(
1420 name: "mz_persist_shard_cmd_succeeded",
1421 help: "count of commands succeeded by shard",
1422 var_labels: ["shard", "name"],
1423 )),
1424 usage_current_state_batches_bytes: registry.register(metric!(
1425 name: "mz_persist_shard_usage_current_state_batches_bytes",
1426 help: "data in batches/parts referenced by current version of state",
1427 var_labels: ["shard", "name"],
1428 )),
1429 usage_current_state_rollups_bytes: registry.register(metric!(
1430 name: "mz_persist_shard_usage_current_state_rollups_bytes",
1431 help: "data in rollups referenced by current version of state",
1432 var_labels: ["shard", "name"],
1433 )),
1434 usage_referenced_not_current_state_bytes: registry.register(metric!(
1435 name: "mz_persist_shard_usage_referenced_not_current_state_bytes",
1436 help: "data referenced only by a previous version of state",
1437 var_labels: ["shard", "name"],
1438 )),
1439 usage_not_leaked_not_referenced_bytes: registry.register(metric!(
1440 name: "mz_persist_shard_usage_not_leaked_not_referenced_bytes",
1441 help: "data written by an active writer but not referenced by any version of state",
1442 var_labels: ["shard", "name"],
1443 )),
1444 usage_leaked_bytes: registry.register(metric!(
1445 name: "mz_persist_shard_usage_leaked_bytes",
1446 help: "data reclaimable by a leaked blob detector",
1447 var_labels: ["shard", "name"],
1448 )),
1449 pubsub_push_diff_applied: registry.register(metric!(
1450 name: "mz_persist_shard_pubsub_diff_applied",
1451 help: "number of diffs received via pubsub that applied",
1452 var_labels: ["shard", "name"],
1453 )),
1454 pubsub_push_diff_not_applied_stale: registry.register(metric!(
1455 name: "mz_persist_shard_pubsub_diff_not_applied_stale",
1456 help: "number of diffs received via pubsub that did not apply due to staleness",
1457 var_labels: ["shard", "name"],
1458 )),
1459 pubsub_push_diff_not_applied_out_of_order: registry.register(metric!(
1460 name: "mz_persist_shard_pubsub_diff_not_applied_out_of_order",
1461 help: "number of diffs received via pubsub that did not apply due to out-of-order delivery",
1462 var_labels: ["shard", "name"],
1463 )),
1464 stale_version: registry.register(metric!(
1465 name: "mz_persist_shard_stale_version",
1466 help: "indicates whether the current version of the shard is less than the current version of the code",
1467 var_labels: ["shard", "name"],
1468 )),
1469 blob_gets: registry.register(metric!(
1470 name: "mz_persist_shard_blob_gets",
1471 help: "number of Blob::get calls for this shard",
1472 var_labels: ["shard", "name"],
1473 )),
1474 blob_sets: registry.register(metric!(
1475 name: "mz_persist_shard_blob_sets",
1476 help: "number of Blob::set calls for this shard",
1477 var_labels: ["shard", "name"],
1478 )),
1479 live_writers: registry.register(metric!(
1480 name: "mz_persist_shard_live_writers",
1481 help: "number of writers that have recently appended updates to this shard",
1482 var_labels: ["shard", "name"],
1483 )),
1484 unconsolidated_snapshot: registry.register(metric!(
1485 name: "mz_persist_shard_unconsolidated_snapshot",
1486 help: "in snapshot_and_read, the number of times consolidating the raw data wasn't enough to produce consolidated output",
1487 var_labels: ["shard", "name"],
1488 )),
1489 backpressure_emitted_bytes: registry.register(metric!(
1490 name: "mz_persist_backpressure_emitted_bytes",
1491 help: "A counter with the number of emitted bytes.",
1492 var_labels: ["shard", "name"],
1493 )),
1494 backpressure_last_backpressured_bytes: registry.register(metric!(
1495 name: "mz_persist_backpressure_last_backpressured_bytes",
1496 help: "The last count of bytes we are waiting to be retired in \
1497 the operator. This cannot be directly compared to \
1498 `retired_bytes`, but CAN indicate that backpressure is happening.",
1499 var_labels: ["shard", "name"],
1500 )),
1501 backpressure_retired_bytes: registry.register(metric!(
1502 name: "mz_persist_backpressure_retired_bytes",
1503 help:"A counter with the number of bytes retired by downstream processing.",
1504 var_labels: ["shard", "name"],
1505 )),
1506 rewrite_part_count: registry.register(metric!(
1507 name: "mz_persist_shard_rewrite_part_count",
1508 help: "count of batch parts with rewrites by shard",
1509 var_labels: ["shard", "name"],
1510 )),
1511 inline_part_count: registry.register(metric!(
1512 name: "mz_persist_shard_inline_part_count",
1513 help: "count of parts inline in shard metadata",
1514 var_labels: ["shard", "name"],
1515 )),
1516 inline_part_bytes: registry.register(metric!(
1517 name: "mz_persist_shard_inline_part_bytes",
1518 help: "total size of parts inline in shard metadata",
1519 var_labels: ["shard", "name"],
1520 )),
1521 compact_batches: registry.register(metric!(
1522 name: "mz_persist_shard_compact_batches",
1523 help: "number of fully compact batches in the shard",
1524 var_labels: ["shard", "name"],
1525 )),
1526 compacting_batches: registry.register(metric!(
1527 name: "mz_persist_shard_compacting_batches",
1528 help: "number of batches in the shard with compactions in progress",
1529 var_labels: ["shard", "name"],
1530 )),
1531 noncompact_batches: registry.register(metric!(
1532 name: "mz_persist_shard_noncompact_batches",
1533 help: "number of batches in the shard that aren't compact and have no ongoing compaction",
1534 var_labels: ["shard", "name"],
1535 )),
1536 schema_registry_version_count: registry.register(metric!(
1537 name: "mz_persist_shard_schema_registry_version_count",
1538 help: "count of versions in the schema registry",
1539 var_labels: ["shard", "name"],
1540 )),
1541 inline_backpressure_count: registry.register(metric!(
1542 name: "mz_persist_shard_inline_backpressure_count",
1543 help: "count of CaA attempts retried because of inline backpressure",
1544 var_labels: ["shard", "name"],
1545 )),
1546 shards,
1547 }
1548 }
1549
1550 pub fn shard(&self, shard_id: &ShardId, name: &str) -> Arc<ShardMetrics> {
1551 let mut shards = self.shards.lock().expect("mutex poisoned");
1552 if let Some(shard) = shards.get(shard_id) {
1553 if let Some(shard) = shard.upgrade() {
1554 return Arc::clone(&shard);
1555 } else {
1556 assert!(shards.remove(shard_id).is_some());
1557 }
1558 }
1559 let shard = Arc::new(ShardMetrics::new(shard_id, name, self));
1560 assert!(
1561 shards
1562 .insert(shard_id.clone(), Arc::downgrade(&shard))
1563 .is_none()
1564 );
1565 shard
1566 }
1567
1568 fn compute<F: FnMut(&ShardMetrics)>(
1569 shards: &Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
1570 mut f: F,
1571 ) {
1572 let mut shards = shards.lock().expect("mutex poisoned");
1573 let mut deleted_shards = Vec::new();
1574 for (shard_id, metrics) in shards.iter() {
1575 if let Some(metrics) = metrics.upgrade() {
1576 f(&metrics);
1577 } else {
1578 deleted_shards.push(shard_id.clone());
1579 }
1580 }
1581 for deleted_shard_id in deleted_shards {
1582 assert!(shards.remove(&deleted_shard_id).is_some());
1583 }
1584 }
1585}
1586
1587#[derive(Debug)]
1588pub struct ShardMetrics {
1589 pub shard_id: ShardId,
1590 pub name: String,
1591 pub since: DeleteOnDropGauge<AtomicI64, Vec<String>>,
1592 pub upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
1593 pub largest_batch_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1594 pub latest_rollup_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1595 pub encoded_diff_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1596 pub hollow_batch_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1597 pub spine_batch_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1598 pub batch_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1599 batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
1600 batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
1601 batch_part_version_map: Mutex<BTreeMap<String, BatchPartVersionMetrics>>,
1602 pub update_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1603 pub rollup_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1604 pub seqnos_held: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1605 pub seqnos_since_last_rollup: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1606 pub gc_seqno_held_parts: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1607 pub gc_live_diffs: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1608 pub usage_current_state_batches_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1609 pub usage_current_state_rollups_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1610 pub usage_referenced_not_current_state_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1611 pub usage_not_leaked_not_referenced_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1612 pub usage_leaked_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1613 pub gc_finished: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1614 pub compaction_applied: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1615 pub cmd_succeeded: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1616 pub pubsub_push_diff_applied: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1617 pub pubsub_push_diff_not_applied_stale: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1618 pub pubsub_push_diff_not_applied_out_of_order: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1619 pub stale_version: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1620 pub blob_gets: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1621 pub blob_sets: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1622 pub live_writers: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1623 pub unconsolidated_snapshot: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1624 pub backpressure_emitted_bytes: Arc<DeleteOnDropCounter<AtomicU64, Vec<String>>>,
1625 pub backpressure_last_backpressured_bytes: Arc<DeleteOnDropGauge<AtomicU64, Vec<String>>>,
1626 pub backpressure_retired_bytes: Arc<DeleteOnDropCounter<AtomicU64, Vec<String>>>,
1627 pub rewrite_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1628 pub inline_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1629 pub inline_part_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1630 pub compact_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1631 pub compacting_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1632 pub noncompact_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1633 pub schema_registry_version_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1634 pub inline_backpressure_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1635}
1636
1637impl ShardMetrics {
1638 pub fn new(shard_id: &ShardId, name: &str, shards_metrics: &ShardsMetrics) -> Self {
1639 let shard = shard_id.to_string();
1640 ShardMetrics {
1641 shard_id: *shard_id,
1642 name: name.to_string(),
1643 since: shards_metrics
1644 .since
1645 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1646 upper: shards_metrics
1647 .upper
1648 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1649 latest_rollup_size: shards_metrics
1650 .encoded_rollup_size
1651 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1652 encoded_diff_size: shards_metrics
1653 .encoded_diff_size
1654 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1655 hollow_batch_count: shards_metrics
1656 .hollow_batch_count
1657 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1658 spine_batch_count: shards_metrics
1659 .spine_batch_count
1660 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1661 batch_part_count: shards_metrics
1662 .batch_part_count
1663 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1664 batch_part_version_count: shards_metrics.batch_part_version_count.clone(),
1665 batch_part_version_bytes: shards_metrics.batch_part_version_bytes.clone(),
1666 batch_part_version_map: Mutex::new(BTreeMap::new()),
1667 update_count: shards_metrics
1668 .update_count
1669 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1670 rollup_count: shards_metrics
1671 .rollup_count
1672 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1673 largest_batch_size: shards_metrics
1674 .largest_batch_size
1675 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1676 seqnos_held: shards_metrics
1677 .seqnos_held
1678 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1679 seqnos_since_last_rollup: shards_metrics
1680 .seqnos_since_last_rollup
1681 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1682 gc_seqno_held_parts: shards_metrics
1683 .gc_seqno_held_parts
1684 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1685 gc_live_diffs: shards_metrics
1686 .gc_live_diffs
1687 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1688 gc_finished: shards_metrics
1689 .gc_finished
1690 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1691 compaction_applied: shards_metrics
1692 .compaction_applied
1693 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1694 cmd_succeeded: shards_metrics
1695 .cmd_succeeded
1696 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1697 usage_current_state_batches_bytes: shards_metrics
1698 .usage_current_state_batches_bytes
1699 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1700 usage_current_state_rollups_bytes: shards_metrics
1701 .usage_current_state_rollups_bytes
1702 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1703 usage_referenced_not_current_state_bytes: shards_metrics
1704 .usage_referenced_not_current_state_bytes
1705 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1706 usage_not_leaked_not_referenced_bytes: shards_metrics
1707 .usage_not_leaked_not_referenced_bytes
1708 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1709 usage_leaked_bytes: shards_metrics
1710 .usage_leaked_bytes
1711 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1712 pubsub_push_diff_applied: shards_metrics
1713 .pubsub_push_diff_applied
1714 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1715 pubsub_push_diff_not_applied_stale: shards_metrics
1716 .pubsub_push_diff_not_applied_stale
1717 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1718 pubsub_push_diff_not_applied_out_of_order: shards_metrics
1719 .pubsub_push_diff_not_applied_out_of_order
1720 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1721 stale_version: shards_metrics
1722 .stale_version
1723 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1724 blob_gets: shards_metrics
1725 .blob_gets
1726 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1727 blob_sets: shards_metrics
1728 .blob_sets
1729 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1730 live_writers: shards_metrics
1731 .live_writers
1732 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1733 unconsolidated_snapshot: shards_metrics
1734 .unconsolidated_snapshot
1735 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1736 backpressure_emitted_bytes: Arc::new(
1737 shards_metrics
1738 .backpressure_emitted_bytes
1739 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1740 ),
1741 backpressure_last_backpressured_bytes: Arc::new(
1742 shards_metrics
1743 .backpressure_last_backpressured_bytes
1744 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1745 ),
1746 backpressure_retired_bytes: Arc::new(
1747 shards_metrics
1748 .backpressure_retired_bytes
1749 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1750 ),
1751 rewrite_part_count: shards_metrics
1752 .rewrite_part_count
1753 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1754 inline_part_count: shards_metrics
1755 .inline_part_count
1756 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1757 inline_part_bytes: shards_metrics
1758 .inline_part_bytes
1759 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1760 compact_batches: shards_metrics
1761 .compact_batches
1762 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1763 compacting_batches: shards_metrics
1764 .compacting_batches
1765 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1766 noncompact_batches: shards_metrics
1767 .noncompact_batches
1768 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1769 schema_registry_version_count: shards_metrics
1770 .schema_registry_version_count
1771 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1772 inline_backpressure_count: shards_metrics
1773 .inline_backpressure_count
1774 .get_delete_on_drop_metric(vec![shard, name.to_string()]),
1775 }
1776 }
1777
1778 pub fn set_since<T: Codec64>(&self, since: &Antichain<T>) {
1779 self.since.set(encode_ts_metric(since))
1780 }
1781
1782 pub fn set_upper<T: Codec64>(&self, upper: &Antichain<T>) {
1783 self.upper.set(encode_ts_metric(upper))
1784 }
1785
1786 pub(crate) fn set_batch_part_versions<'a>(
1787 &self,
1788 batch_parts_by_version: impl Iterator<Item = (&'a str, usize)>,
1789 ) {
1790 let mut map = self
1791 .batch_part_version_map
1792 .lock()
1793 .expect("mutex should not be poisoned");
1794 for x in map.values() {
1801 x.batch_part_version_count.set(0);
1802 x.batch_part_version_bytes.set(0);
1803 }
1804
1805 for (key, bytes) in batch_parts_by_version {
1808 if !map.contains_key(key) {
1809 map.insert(
1810 key.to_owned(),
1811 BatchPartVersionMetrics {
1812 batch_part_version_count: self
1813 .batch_part_version_count
1814 .get_delete_on_drop_metric(vec![
1815 self.shard_id.to_string(),
1816 self.name.clone(),
1817 key.to_owned(),
1818 ]),
1819 batch_part_version_bytes: self
1820 .batch_part_version_bytes
1821 .get_delete_on_drop_metric(vec![
1822 self.shard_id.to_string(),
1823 self.name.clone(),
1824 key.to_owned(),
1825 ]),
1826 },
1827 );
1828 }
1829 let value = map.get(key).expect("inserted above");
1830 value.batch_part_version_count.inc();
1831 value.batch_part_version_bytes.add(u64::cast_from(bytes));
1832 }
1833 }
1834}
1835
1836#[derive(Debug)]
1837pub struct BatchPartVersionMetrics {
1838 pub batch_part_version_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1839 pub batch_part_version_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1840}
1841
1842#[derive(Debug)]
1844pub struct UsageAuditMetrics {
1845 pub blob_batch_part_bytes: UIntGauge,
1847 pub blob_batch_part_count: UIntGauge,
1849 pub blob_rollup_bytes: UIntGauge,
1851 pub blob_rollup_count: UIntGauge,
1853 pub blob_bytes: UIntGauge,
1855 pub blob_count: UIntGauge,
1857 pub step_blob_metadata: Counter,
1859 pub step_state: Counter,
1861 pub step_math: Counter,
1863}
1864
1865impl UsageAuditMetrics {
1866 fn new(registry: &MetricsRegistry) -> Self {
1867 let step_timings: CounterVec = registry.register(metric!(
1868 name: "mz_persist_audit_step_seconds",
1869 help: "time spent on individual steps of audit",
1870 var_labels: ["step"],
1871 ));
1872 UsageAuditMetrics {
1873 blob_batch_part_bytes: registry.register(metric!(
1874 name: "mz_persist_audit_blob_batch_part_bytes",
1875 help: "total size of batch parts in blob",
1876 )),
1877 blob_batch_part_count: registry.register(metric!(
1878 name: "mz_persist_audit_blob_batch_part_count",
1879 help: "count of batch parts in blob",
1880 )),
1881 blob_rollup_bytes: registry.register(metric!(
1882 name: "mz_persist_audit_blob_rollup_bytes",
1883 help: "total size of state rollups stored in blob",
1884 )),
1885 blob_rollup_count: registry.register(metric!(
1886 name: "mz_persist_audit_blob_rollup_count",
1887 help: "count of all state rollups in blob",
1888 )),
1889 blob_bytes: registry.register(metric!(
1890 name: "mz_persist_audit_blob_bytes",
1891 help: "total size of blob",
1892 )),
1893 blob_count: registry.register(metric!(
1894 name: "mz_persist_audit_blob_count",
1895 help: "count of all blobs",
1896 )),
1897 step_blob_metadata: step_timings.with_label_values(&["blob_metadata"]),
1898 step_state: step_timings.with_label_values(&["state"]),
1899 step_math: step_timings.with_label_values(&["math"]),
1900 }
1901 }
1902}
1903
1904#[derive(Debug)]
1907pub enum UpdateDelta {
1908 Negative(u64),
1910 NonNegative(u64),
1912}
1913
1914impl UpdateDelta {
1915 pub fn new(new: usize, old: usize) -> Self {
1918 if new < old {
1919 UpdateDelta::Negative(CastFrom::cast_from(old - new))
1920 } else {
1921 UpdateDelta::NonNegative(CastFrom::cast_from(new - old))
1922 }
1923 }
1924}
1925
1926#[derive(Debug, Clone)]
1929pub struct SinkMetrics {
1930 correction_insertions_total: IntCounter,
1932 correction_deletions_total: IntCounter,
1934 correction_capacity_increases_total: IntCounter,
1936 correction_capacity_decreases_total: IntCounter,
1938 correction_max_per_sink_worker_len_updates: raw::UIntGaugeVec,
1940 correction_max_per_sink_worker_capacity_updates: raw::UIntGaugeVec,
1942}
1943
1944impl SinkMetrics {
1945 fn new(registry: &MetricsRegistry) -> Self {
1946 SinkMetrics {
1947 correction_insertions_total: registry.register(metric!(
1948 name: "mz_persist_sink_correction_insertions_total",
1949 help: "The cumulative insertions observed on the correction buffer across workers and persist sinks.",
1950 )),
1951 correction_deletions_total: registry.register(metric!(
1952 name: "mz_persist_sink_correction_deletions_total",
1953 help: "The cumulative deletions observed on the correction buffer across workers and persist sinks.",
1954 )),
1955 correction_capacity_increases_total: registry.register(metric!(
1956 name: "mz_persist_sink_correction_capacity_increases_total",
1957 help: "The cumulative capacity increases observed on the correction buffer across workers and persist sinks.",
1958 )),
1959 correction_capacity_decreases_total: registry.register(metric!(
1960 name: "mz_persist_sink_correction_capacity_decreases_total",
1961 help: "The cumulative capacity decreases observed on the correction buffer across workers and persist sinks.",
1962 )),
1963 correction_max_per_sink_worker_len_updates: registry.register(metric!(
1964 name: "mz_persist_sink_correction_max_per_sink_worker_len_updates",
1965 help: "The maximum length observed for the correction buffer of any single persist sink per worker.",
1966 var_labels: ["worker_id"],
1967 )),
1968 correction_max_per_sink_worker_capacity_updates: registry.register(metric!(
1969 name: "mz_persist_sink_correction_max_per_sink_worker_capacity_updates",
1970 help: "The maximum capacity observed for the correction buffer of any single persist sink per worker.",
1971 var_labels: ["worker_id"],
1972 )),
1973 }
1974 }
1975
1976 pub fn for_worker(&self, worker_id: usize) -> SinkWorkerMetrics {
1981 let worker = worker_id.to_string();
1982 let correction_max_per_sink_worker_len_updates = self
1983 .correction_max_per_sink_worker_len_updates
1984 .with_label_values(&[&worker]);
1985 let correction_max_per_sink_worker_capacity_updates = self
1986 .correction_max_per_sink_worker_capacity_updates
1987 .with_label_values(&[&worker]);
1988 SinkWorkerMetrics {
1989 correction_max_per_sink_worker_len_updates,
1990 correction_max_per_sink_worker_capacity_updates,
1991 }
1992 }
1993
1994 pub fn report_correction_update_deltas(
2000 &self,
2001 correction_len_delta: UpdateDelta,
2002 correction_cap_delta: UpdateDelta,
2003 ) {
2004 match correction_len_delta {
2006 UpdateDelta::NonNegative(delta) => {
2007 if delta > 0 {
2008 self.correction_insertions_total.inc_by(delta)
2009 }
2010 }
2011 UpdateDelta::Negative(delta) => self.correction_deletions_total.inc_by(delta),
2012 }
2013 match correction_cap_delta {
2015 UpdateDelta::NonNegative(delta) => {
2016 if delta > 0 {
2017 self.correction_capacity_increases_total.inc_by(delta)
2018 }
2019 }
2020 UpdateDelta::Negative(delta) => self.correction_capacity_decreases_total.inc_by(delta),
2021 }
2022 }
2023}
2024
2025#[derive(Clone, Debug)]
2027pub struct SinkWorkerMetrics {
2028 correction_max_per_sink_worker_len_updates: UIntGauge,
2029 correction_max_per_sink_worker_capacity_updates: UIntGauge,
2030}
2031
2032impl SinkWorkerMetrics {
2033 pub fn report_correction_update_totals(&self, correction_len: usize, correction_cap: usize) {
2038 let correction_len = CastFrom::cast_from(correction_len);
2040 if correction_len > self.correction_max_per_sink_worker_len_updates.get() {
2041 self.correction_max_per_sink_worker_len_updates
2042 .set(correction_len);
2043 }
2044 let correction_cap = CastFrom::cast_from(correction_cap);
2045 if correction_cap > self.correction_max_per_sink_worker_capacity_updates.get() {
2046 self.correction_max_per_sink_worker_capacity_updates
2047 .set(correction_cap);
2048 }
2049 }
2050}
2051
2052#[derive(Debug)]
2054pub struct AlertsMetrics {
2055 pub(crate) blob_failures: IntCounter,
2056 pub(crate) consensus_failures: IntCounter,
2057}
2058
2059impl AlertsMetrics {
2060 fn new(registry: &MetricsRegistry) -> Self {
2061 AlertsMetrics {
2062 blob_failures: registry.register(metric!(
2063 name: "mz_persist_blob_failures",
2064 help: "count of all blob operation failures",
2065 const_labels: {"honeycomb" => "import"},
2066 )),
2067 consensus_failures: registry.register(metric!(
2068 name: "mz_persist_consensus_failures",
2069 help: "count of determinate consensus operation failures",
2070 const_labels: {"honeycomb" => "import"},
2071 )),
2072 }
2073 }
2074}
2075
2076#[derive(Debug)]
2078pub struct PubSubServerMetrics {
2079 pub(crate) active_connections: UIntGauge,
2080 pub(crate) broadcasted_diff_count: IntCounter,
2081 pub(crate) broadcasted_diff_bytes: IntCounter,
2082 pub(crate) broadcasted_diff_dropped_channel_full: IntCounter,
2083
2084 pub(crate) push_seconds: Counter,
2085 pub(crate) subscribe_seconds: Counter,
2086 pub(crate) unsubscribe_seconds: Counter,
2087 pub(crate) connection_cleanup_seconds: Counter,
2088
2089 pub(crate) push_call_count: IntCounter,
2090 pub(crate) subscribe_call_count: IntCounter,
2091 pub(crate) unsubscribe_call_count: IntCounter,
2092}
2093
2094impl PubSubServerMetrics {
2095 pub(crate) fn new(registry: &MetricsRegistry) -> Self {
2096 let op_timings: CounterVec = registry.register(metric!(
2097 name: "mz_persist_pubsub_server_operation_seconds",
2098 help: "time spent in pubsub server performing each operation",
2099 var_labels: ["op"],
2100 ));
2101 let call_count: IntCounterVec = registry.register(metric!(
2102 name: "mz_persist_pubsub_server_call_count",
2103 help: "count of each pubsub server message received",
2104 var_labels: ["call"],
2105 ));
2106
2107 Self {
2108 active_connections: registry.register(metric!(
2109 name: "mz_persist_pubsub_server_active_connections",
2110 help: "number of active connections to server",
2111 )),
2112 broadcasted_diff_count: registry.register(metric!(
2113 name: "mz_persist_pubsub_server_broadcasted_diff_count",
2114 help: "count of total broadcast diff messages sent",
2115 )),
2116 broadcasted_diff_bytes: registry.register(metric!(
2117 name: "mz_persist_pubsub_server_broadcasted_diff_bytes",
2118 help: "count of total broadcast diff bytes sent",
2119 )),
2120 broadcasted_diff_dropped_channel_full: registry.register(metric!(
2121 name: "mz_persist_pubsub_server_broadcasted_diff_dropped_channel_full",
2122 help: "count of diffs dropped due to full connection channel",
2123 )),
2124
2125 push_seconds: op_timings.with_label_values(&["push"]),
2126 subscribe_seconds: op_timings.with_label_values(&["subscribe"]),
2127 unsubscribe_seconds: op_timings.with_label_values(&["unsubscribe"]),
2128 connection_cleanup_seconds: op_timings.with_label_values(&["cleanup"]),
2129
2130 push_call_count: call_count.with_label_values(&["push"]),
2131 subscribe_call_count: call_count.with_label_values(&["subscribe"]),
2132 unsubscribe_call_count: call_count.with_label_values(&["unsubscribe"]),
2133 }
2134 }
2135}
2136
2137#[derive(Debug)]
2139pub struct PubSubClientMetrics {
2140 pub sender: PubSubClientSenderMetrics,
2141 pub receiver: PubSubClientReceiverMetrics,
2142 pub grpc_connection: PubSubGrpcClientConnectionMetrics,
2143}
2144
2145impl PubSubClientMetrics {
2146 fn new(registry: &MetricsRegistry) -> Self {
2147 PubSubClientMetrics {
2148 sender: PubSubClientSenderMetrics::new(registry),
2149 receiver: PubSubClientReceiverMetrics::new(registry),
2150 grpc_connection: PubSubGrpcClientConnectionMetrics::new(registry),
2151 }
2152 }
2153}
2154
2155#[derive(Debug)]
2156pub struct PubSubGrpcClientConnectionMetrics {
2157 pub(crate) connected: UIntGauge,
2158 pub(crate) connection_established_count: IntCounter,
2159 pub(crate) connect_call_attempt_count: IntCounter,
2160 pub(crate) broadcast_recv_lagged_count: IntCounter,
2161 pub(crate) grpc_error_count: IntCounter,
2162}
2163
2164impl PubSubGrpcClientConnectionMetrics {
2165 fn new(registry: &MetricsRegistry) -> Self {
2166 Self {
2167 connected: registry.register(metric!(
2168 name: "mz_persist_pubsub_client_grpc_connected",
2169 help: "whether the grpc client is currently connected",
2170 )),
2171 connection_established_count: registry.register(metric!(
2172 name: "mz_persist_pubsub_client_grpc_connection_established_count",
2173 help: "count of grpc connection establishments to pubsub server",
2174 )),
2175 connect_call_attempt_count: registry.register(metric!(
2176 name: "mz_persist_pubsub_client_grpc_connect_call_attempt_count",
2177 help: "count of connection call attempts (including retries) to pubsub server",
2178 )),
2179 broadcast_recv_lagged_count: registry.register(metric!(
2180 name: "mz_persist_pubsub_client_grpc_broadcast_recv_lagged_count",
2181 help: "times a message was missed by broadcast receiver due to lag",
2182 )),
2183 grpc_error_count: registry.register(metric!(
2184 name: "mz_persist_pubsub_client_grpc_error_count",
2185 help: "count of grpc errors received",
2186 )),
2187 }
2188 }
2189}
2190
2191#[derive(Clone, Debug)]
2192pub struct PubSubClientReceiverMetrics {
2193 pub(crate) push_received: IntCounter,
2194 pub(crate) unknown_message_received: IntCounter,
2195 pub(crate) approx_diff_latency_seconds: Histogram,
2196
2197 pub(crate) state_pushed_diff_fast_path: IntCounter,
2198 pub(crate) state_pushed_diff_slow_path_succeeded: IntCounter,
2199 pub(crate) state_pushed_diff_slow_path_failed: IntCounter,
2200}
2201
2202impl PubSubClientReceiverMetrics {
2203 fn new(registry: &MetricsRegistry) -> Self {
2204 let call_received: IntCounterVec = registry.register(metric!(
2205 name: "mz_persist_pubsub_client_call_received",
2206 help: "times a pubsub client call was received",
2207 var_labels: ["call"],
2208 ));
2209
2210 Self {
2211 push_received: call_received.with_label_values(&["push"]),
2212 unknown_message_received: call_received.with_label_values(&["unknown"]),
2213 approx_diff_latency_seconds: registry.register(metric!(
2214 name: "mz_persist_pubsub_client_approx_diff_apply_latency_seconds",
2215 help: "histogram of (approximate) latency between sending a diff and applying it",
2216 buckets: prometheus::exponential_buckets(0.001, 2.0, 13).expect("buckets"),
2217 )),
2218
2219 state_pushed_diff_fast_path: registry.register(metric!(
2220 name: "mz_persist_pubsub_client_receiver_state_push_diff_fast_path",
2221 help: "count fast-path state push_diff calls",
2222 )),
2223 state_pushed_diff_slow_path_succeeded: registry.register(metric!(
2224 name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_succeeded",
2225 help: "count of successful slow-path state push_diff calls",
2226 )),
2227 state_pushed_diff_slow_path_failed: registry.register(metric!(
2228 name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_failed",
2229 help: "count of unsuccessful slow-path state push_diff calls",
2230 )),
2231 }
2232 }
2233}
2234
2235#[derive(Debug)]
2236pub struct PubSubClientSenderMetrics {
2237 pub push: PubSubClientCallMetrics,
2238 pub subscribe: PubSubClientCallMetrics,
2239 pub unsubscribe: PubSubClientCallMetrics,
2240}
2241
2242#[derive(Debug)]
2243pub struct PubSubClientCallMetrics {
2244 pub(crate) succeeded: IntCounter,
2245 pub(crate) bytes_sent: IntCounter,
2246 pub(crate) failed: IntCounter,
2247}
2248
2249impl PubSubClientSenderMetrics {
2250 fn new(registry: &MetricsRegistry) -> Self {
2251 let call_bytes_sent: IntCounterVec = registry.register(metric!(
2252 name: "mz_persist_pubsub_client_call_bytes_sent",
2253 help: "number of bytes sent for a given pubsub client call",
2254 var_labels: ["call"],
2255 ));
2256 let call_succeeded: IntCounterVec = registry.register(metric!(
2257 name: "mz_persist_pubsub_client_call_succeeded",
2258 help: "times a pubsub client call succeeded",
2259 var_labels: ["call"],
2260 ));
2261 let call_failed: IntCounterVec = registry.register(metric!(
2262 name: "mz_persist_pubsub_client_call_failed",
2263 help: "times a pubsub client call failed",
2264 var_labels: ["call"],
2265 ));
2266
2267 Self {
2268 push: PubSubClientCallMetrics {
2269 succeeded: call_succeeded.with_label_values(&["push"]),
2270 failed: call_failed.with_label_values(&["push"]),
2271 bytes_sent: call_bytes_sent.with_label_values(&["push"]),
2272 },
2273 subscribe: PubSubClientCallMetrics {
2274 succeeded: call_succeeded.with_label_values(&["subscribe"]),
2275 failed: call_failed.with_label_values(&["subscribe"]),
2276 bytes_sent: call_bytes_sent.with_label_values(&["subscribe"]),
2277 },
2278 unsubscribe: PubSubClientCallMetrics {
2279 succeeded: call_succeeded.with_label_values(&["unsubscribe"]),
2280 failed: call_failed.with_label_values(&["unsubscribe"]),
2281 bytes_sent: call_bytes_sent.with_label_values(&["unsubscribe"]),
2282 },
2283 }
2284 }
2285}
2286
2287#[derive(Debug)]
2288pub struct LocksMetrics {
2289 pub(crate) applier_read_cacheable: LockMetrics,
2290 pub(crate) applier_read_noncacheable: LockMetrics,
2291 pub(crate) applier_write: LockMetrics,
2292 pub(crate) watch: LockMetrics,
2293}
2294
2295#[derive(Debug, Clone)]
2296pub struct LockMetrics {
2297 pub(crate) acquire_count: IntCounter,
2298 pub(crate) blocking_acquire_count: IntCounter,
2299 pub(crate) blocking_seconds: Counter,
2300}
2301
2302#[derive(Debug)]
2303pub struct WatchMetrics {
2304 pub(crate) listen_woken_via_watch: IntCounter,
2305 pub(crate) listen_woken_via_sleep: IntCounter,
2306 pub(crate) listen_resolved_via_watch: IntCounter,
2307 pub(crate) listen_resolved_via_sleep: IntCounter,
2308 pub(crate) snapshot_woken_via_watch: IntCounter,
2309 pub(crate) snapshot_woken_via_sleep: IntCounter,
2310 pub(crate) notify_sent: IntCounter,
2311 pub(crate) notify_noop: IntCounter,
2312 pub(crate) notify_recv: IntCounter,
2313 pub(crate) notify_lagged: IntCounter,
2314 pub(crate) notify_wait_started: IntCounter,
2315 pub(crate) notify_wait_finished: IntCounter,
2316}
2317
2318impl WatchMetrics {
2319 fn new(registry: &MetricsRegistry) -> Self {
2320 WatchMetrics {
2321 listen_woken_via_watch: registry.register(metric!(
2322 name: "mz_persist_listen_woken_via_watch",
2323 help: "count of listen next batches wakes via watch notify",
2324 )),
2325 listen_woken_via_sleep: registry.register(metric!(
2326 name: "mz_persist_listen_woken_via_sleep",
2327 help: "count of listen next batches wakes via sleep",
2328 )),
2329 listen_resolved_via_watch: registry.register(metric!(
2330 name: "mz_persist_listen_resolved_via_watch",
2331 help: "count of listen next batches resolved via watch notify",
2332 )),
2333 listen_resolved_via_sleep: registry.register(metric!(
2334 name: "mz_persist_listen_resolved_via_sleep",
2335 help: "count of listen next batches resolved via sleep",
2336 )),
2337 snapshot_woken_via_watch: registry.register(metric!(
2338 name: "mz_persist_snapshot_woken_via_watch",
2339 help: "count of snapshot wakes via watch notify",
2340 )),
2341 snapshot_woken_via_sleep: registry.register(metric!(
2342 name: "mz_persist_snapshot_woken_via_sleep",
2343 help: "count of snapshot wakes via sleep",
2344 )),
2345 notify_sent: registry.register(metric!(
2346 name: "mz_persist_watch_notify_sent",
2347 help: "count of watch notifications sent to a non-empty broadcast channel",
2348 )),
2349 notify_noop: registry.register(metric!(
2350 name: "mz_persist_watch_notify_noop",
2351 help: "count of watch notifications sent to an broadcast channel",
2352 )),
2353 notify_recv: registry.register(metric!(
2354 name: "mz_persist_watch_notify_recv",
2355 help: "count of watch notifications received from the broadcast channel",
2356 )),
2357 notify_lagged: registry.register(metric!(
2358 name: "mz_persist_watch_notify_lagged",
2359 help: "count of lagged events in the watch notification broadcast channel",
2360 )),
2361 notify_wait_started: registry.register(metric!(
2362 name: "mz_persist_watch_notify_wait_started",
2363 help: "count of watch wait calls started",
2364 )),
2365 notify_wait_finished: registry.register(metric!(
2366 name: "mz_persist_watch_notify_wait_finished",
2367 help: "count of watch wait calls resolved",
2368 )),
2369 }
2370 }
2371}
2372
2373#[derive(Debug)]
2374pub struct PushdownMetrics {
2375 pub(crate) parts_filtered_count: IntCounter,
2376 pub(crate) parts_filtered_bytes: IntCounter,
2377 pub(crate) parts_fetched_count: IntCounter,
2378 pub(crate) parts_fetched_bytes: IntCounter,
2379 pub(crate) parts_audited_count: IntCounter,
2380 pub(crate) parts_audited_bytes: IntCounter,
2381 pub(crate) parts_inline_count: IntCounter,
2382 pub(crate) parts_inline_bytes: IntCounter,
2383 pub(crate) parts_faked_count: IntCounter,
2384 pub(crate) parts_faked_bytes: IntCounter,
2385 pub(crate) parts_stats_trimmed_count: IntCounter,
2386 pub(crate) parts_stats_trimmed_bytes: IntCounter,
2387 pub(crate) parts_projection_trimmed_bytes: IntCounter,
2388 pub part_stats: PartStatsMetrics,
2389}
2390
2391impl PushdownMetrics {
2392 fn new(registry: &MetricsRegistry) -> Self {
2393 PushdownMetrics {
2394 parts_filtered_count: registry.register(metric!(
2395 name: "mz_persist_pushdown_parts_filtered_count",
2396 help: "count of parts filtered by pushdown",
2397 )),
2398 parts_filtered_bytes: registry.register(metric!(
2399 name: "mz_persist_pushdown_parts_filtered_bytes",
2400 help: "total size of parts filtered by pushdown in bytes",
2401 )),
2402 parts_fetched_count: registry.register(metric!(
2403 name: "mz_persist_pushdown_parts_fetched_count",
2404 help: "count of parts not filtered by pushdown",
2405 )),
2406 parts_fetched_bytes: registry.register(metric!(
2407 name: "mz_persist_pushdown_parts_fetched_bytes",
2408 help: "total size of parts not filtered by pushdown in bytes",
2409 )),
2410 parts_audited_count: registry.register(metric!(
2411 name: "mz_persist_pushdown_parts_audited_count",
2412 help: "count of parts fetched only for pushdown audit",
2413 )),
2414 parts_audited_bytes: registry.register(metric!(
2415 name: "mz_persist_pushdown_parts_audited_bytes",
2416 help: "total size of parts fetched only for pushdown audit",
2417 )),
2418 parts_inline_count: registry.register(metric!(
2419 name: "mz_persist_pushdown_parts_inline_count",
2420 help: "count of parts not fetched because they were inline",
2421 )),
2422 parts_inline_bytes: registry.register(metric!(
2423 name: "mz_persist_pushdown_parts_inline_bytes",
2424 help: "total size of parts not fetched because they were inline",
2425 )),
2426 parts_faked_count: registry.register(metric!(
2427 name: "mz_persist_pushdown_parts_faked_count",
2428 help: "count of parts faked because of aggressive projection pushdown",
2429 )),
2430 parts_faked_bytes: registry.register(metric!(
2431 name: "mz_persist_pushdown_parts_faked_bytes",
2432 help: "total size of parts replaced with fakes by aggressive projection pushdown",
2433 )),
2434 parts_stats_trimmed_count: registry.register(metric!(
2435 name: "mz_persist_pushdown_parts_stats_trimmed_count",
2436 help: "count of trimmed part stats",
2437 )),
2438 parts_stats_trimmed_bytes: registry.register(metric!(
2439 name: "mz_persist_pushdown_parts_stats_trimmed_bytes",
2440 help: "total bytes trimmed from part stats",
2441 )),
2442 parts_projection_trimmed_bytes: registry.register(metric!(
2443 name: "mz_persist_pushdown_parts_projection_trimmed_bytes",
2444 help: "total bytes trimmed from columnar data because of projection pushdown",
2445 )),
2446 part_stats: PartStatsMetrics::new(registry),
2447 }
2448 }
2449}
2450
2451#[derive(Debug)]
2452pub struct ConsolidationMetrics {
2453 pub(crate) parts_fetched: IntCounter,
2454 pub(crate) parts_skipped: IntCounter,
2455 pub(crate) parts_wasted: IntCounter,
2456 pub(crate) wrong_sort: IntCounter,
2457}
2458
2459impl ConsolidationMetrics {
2460 fn new(registry: &MetricsRegistry) -> Self {
2461 ConsolidationMetrics {
2462 parts_fetched: registry.register(metric!(
2463 name: "mz_persist_consolidation_parts_fetched_count",
2464 help: "count of parts that were fetched and used during consolidation",
2465 )),
2466 parts_skipped: registry.register(metric!(
2467 name: "mz_persist_consolidation_parts_skipped_count",
2468 help: "count of parts that were never needed during consolidation",
2469 )),
2470 parts_wasted: registry.register(metric!(
2471 name: "mz_persist_consolidation_parts_wasted_count",
2472 help: "count of parts that were fetched but not needed during consolidation",
2473 )),
2474 wrong_sort: registry.register(metric!(
2475 name: "mz_persist_consolidation_wrong_sort_count",
2476 help: "count of runs that were sorted using the wrong ordering for the current consolidation",
2477 )),
2478 }
2479 }
2480}
2481
2482#[derive(Debug)]
2483pub struct BlobMemCache {
2484 pub(crate) size_blobs: UIntGauge,
2485 pub(crate) size_bytes: UIntGauge,
2486 pub(crate) hits_blobs: IntCounter,
2487 pub(crate) hits_bytes: IntCounter,
2488 pub(crate) evictions: IntCounter,
2489}
2490
2491impl BlobMemCache {
2492 fn new(registry: &MetricsRegistry) -> Self {
2493 BlobMemCache {
2494 size_blobs: registry.register(metric!(
2495 name: "mz_persist_blob_cache_size_blobs",
2496 help: "count of blobs in the cache",
2497 const_labels: {"cache" => "mem"},
2498 )),
2499 size_bytes: registry.register(metric!(
2500 name: "mz_persist_blob_cache_size_bytes",
2501 help: "total size of blobs in the cache",
2502 const_labels: {"cache" => "mem"},
2503 )),
2504 hits_blobs: registry.register(metric!(
2505 name: "mz_persist_blob_cache_hits_blobs",
2506 help: "count of blobs served via cache instead of s3",
2507 const_labels: {"cache" => "mem"},
2508 )),
2509 hits_bytes: registry.register(metric!(
2510 name: "mz_persist_blob_cache_hits_bytes",
2511 help: "total size of blobs served via cache instead of s3",
2512 const_labels: {"cache" => "mem"},
2513 )),
2514 evictions: registry.register(metric!(
2515 name: "mz_persist_blob_cache_evictions",
2516 help: "count of capacity-based cache evictions",
2517 const_labels: {"cache" => "mem"},
2518 )),
2519 }
2520 }
2521}
2522
2523#[derive(Debug)]
2524pub struct SemaphoreMetrics {
2525 cfg: PersistConfig,
2526 registry: MetricsRegistry,
2527 fetch: OnceCell<MetricsSemaphore>,
2528}
2529
2530impl SemaphoreMetrics {
2531 fn new(cfg: PersistConfig, registry: MetricsRegistry) -> Self {
2532 SemaphoreMetrics {
2533 cfg,
2534 registry,
2535 fetch: OnceCell::new(),
2536 }
2537 }
2538
2539 async fn fetch(&self) -> &MetricsSemaphore {
2543 if let Some(x) = self.fetch.get() {
2544 return x;
2546 }
2547 let cfg = self.cfg.clone();
2548 let registry = self.registry.clone();
2549 let init = async move {
2550 let total_permits = match cfg.announce_memory_limit {
2551 Some(mem) if cfg.is_cc_active => {
2554 info!("fetch semaphore awaiting first dyncfg values");
2557 let () = cfg.configs_synced_once().await;
2558 let total_permits = usize::cast_lossy(
2559 f64::cast_lossy(mem) * FETCH_SEMAPHORE_PERMIT_ADJUSTMENT.get(&cfg),
2560 );
2561 info!("fetch_semaphore got first dyncfg values");
2562 total_permits
2563 }
2564 Some(_) | None => Semaphore::MAX_PERMITS,
2565 };
2566 MetricsSemaphore::new(®istry, "fetch", total_permits)
2567 };
2568 self.fetch.get_or_init(|| init).await
2569 }
2570
2571 pub(crate) async fn acquire_fetch_permits(&self, encoded_size_bytes: usize) -> MetricsPermits {
2572 let requested_permits = f64::cast_lossy(encoded_size_bytes);
2575 let requested_permits = requested_permits * FETCH_SEMAPHORE_COST_ADJUSTMENT.get(&self.cfg);
2576 let requested_permits = usize::cast_lossy(requested_permits);
2577 self.fetch().await.acquire_permits(requested_permits).await
2578 }
2579}
2580
2581#[derive(Debug)]
2582pub struct MetricsSemaphore {
2583 name: &'static str,
2584 semaphore: Arc<Semaphore>,
2585 total_permits: usize,
2586 acquire_count: IntCounter,
2587 blocking_count: IntCounter,
2588 blocking_seconds: Counter,
2589 acquired_permits: IntCounter,
2590 released_permits: IntCounter,
2591 _available_permits: ComputedUIntGauge,
2592}
2593
2594impl MetricsSemaphore {
2595 pub fn new(registry: &MetricsRegistry, name: &'static str, total_permits: usize) -> Self {
2596 let total_permits = std::cmp::min(total_permits, Semaphore::MAX_PERMITS);
2597 let semaphore = Arc::new(Semaphore::new(total_permits));
2600 MetricsSemaphore {
2601 name,
2602 total_permits,
2603 acquire_count: registry.register(metric!(
2604 name: "mz_persist_semaphore_acquire_count",
2605 help: "count of acquire calls (not acquired permits count)",
2606 const_labels: {"name" => name},
2607 )),
2608 blocking_count: registry.register(metric!(
2609 name: "mz_persist_semaphore_blocking_count",
2610 help: "count of acquire calls that had to block",
2611 const_labels: {"name" => name},
2612 )),
2613 blocking_seconds: registry.register(metric!(
2614 name: "mz_persist_semaphore_blocking_seconds",
2615 help: "total time spent blocking on permit acquisition",
2616 const_labels: {"name" => name},
2617 )),
2618 acquired_permits: registry.register(metric!(
2619 name: "mz_persist_semaphore_acquired_permits",
2620 help: "total sum of acquired permits",
2621 const_labels: {"name" => name},
2622 )),
2623 released_permits: registry.register(metric!(
2624 name: "mz_persist_semaphore_released_permits",
2625 help: "total sum of released permits",
2626 const_labels: {"name" => name},
2627 )),
2628 _available_permits: registry.register_computed_gauge(
2629 metric!(
2630 name: "mz_persist_semaphore_available_permits",
2631 help: "currently available permits according to the semaphore",
2632 ),
2633 {
2634 let semaphore = Arc::clone(&semaphore);
2635 move || u64::cast_from(semaphore.available_permits())
2636 },
2637 ),
2638 semaphore,
2639 }
2640 }
2641
2642 pub async fn acquire_permits(&self, requested_permits: usize) -> MetricsPermits {
2643 let total_permits = u32::try_from(self.total_permits).unwrap_or(u32::MAX);
2646 let requested_permits = u32::try_from(requested_permits).unwrap_or(u32::MAX);
2647 let requested_permits = std::cmp::min(requested_permits, total_permits);
2648 let wrap = |_permit| {
2649 self.acquired_permits.inc_by(u64::from(requested_permits));
2650 MetricsPermits {
2651 _permit,
2652 released_metric: self.released_permits.clone(),
2653 count: requested_permits,
2654 }
2655 };
2656
2657 self.acquire_count.inc();
2659 match Arc::clone(&self.semaphore).try_acquire_many_owned(requested_permits) {
2660 Ok(x) => return wrap(x),
2661 Err(_) => {}
2662 };
2663
2664 self.blocking_count.inc();
2666 let start = Instant::now();
2667 let ret = Arc::clone(&self.semaphore)
2668 .acquire_many_owned(requested_permits)
2669 .instrument(info_span!("acquire_permits"))
2670 .await;
2671 let elapsed = start.elapsed();
2672 self.blocking_seconds.inc_by(elapsed.as_secs_f64());
2673 debug!(
2674 "acquisition of {} {} permits blocked for {:?}",
2675 self.name, requested_permits, elapsed
2676 );
2677 wrap(ret.expect("semaphore is never closed"))
2678 }
2679}
2680
2681#[derive(Debug)]
2682pub struct MetricsPermits {
2683 _permit: OwnedSemaphorePermit,
2684 released_metric: IntCounter,
2685 count: u32,
2686}
2687
2688impl Drop for MetricsPermits {
2689 fn drop(&mut self) {
2690 self.released_metric.inc_by(u64::from(self.count))
2691 }
2692}
2693
2694#[derive(Debug)]
2695pub struct ExternalOpMetrics {
2696 started: IntCounter,
2697 succeeded: IntCounter,
2698 failed: IntCounter,
2699 bytes: IntCounter,
2700 seconds: Counter,
2701 seconds_histogram: Option<Histogram>,
2702 alerts_metrics: Arc<AlertsMetrics>,
2703}
2704
2705impl ExternalOpMetrics {
2706 async fn run_op<R, F, OpFn, ErrFn>(
2707 &self,
2708 op_fn: OpFn,
2709 on_err_fn: ErrFn,
2710 ) -> Result<R, ExternalError>
2711 where
2712 F: std::future::Future<Output = Result<R, ExternalError>>,
2713 OpFn: FnOnce() -> F,
2714 ErrFn: FnOnce(&AlertsMetrics, &ExternalError),
2715 {
2716 self.started.inc();
2717 let start = Instant::now();
2718 let res = op_fn().await;
2719 let elapsed_seconds = start.elapsed().as_secs_f64();
2720 self.seconds.inc_by(elapsed_seconds);
2721 if let Some(h) = &self.seconds_histogram {
2722 h.observe(elapsed_seconds);
2723 }
2724 match res.as_ref() {
2725 Ok(_) => self.succeeded.inc(),
2726 Err(err) => {
2727 self.failed.inc();
2728 on_err_fn(&self.alerts_metrics, err);
2729 }
2730 };
2731 res
2732 }
2733
2734 fn run_stream<'a, R: 'a, S, OpFn, ErrFn>(
2735 &'a self,
2736 op_fn: OpFn,
2737 mut on_err_fn: ErrFn,
2738 ) -> impl futures::Stream<Item = Result<R, ExternalError>> + 'a
2739 where
2740 S: futures::Stream<Item = Result<R, ExternalError>> + Unpin + 'a,
2741 OpFn: FnOnce() -> S,
2742 ErrFn: FnMut(&AlertsMetrics, &ExternalError) + 'a,
2743 {
2744 self.started.inc();
2745 let start = Instant::now();
2746 let mut stream = op_fn();
2747 stream! {
2748 let mut succeeded = true;
2749 while let Some(res) = stream.next().await {
2750 if let Err(err) = res.as_ref() {
2751 on_err_fn(&self.alerts_metrics, err);
2752 succeeded = false;
2753 }
2754 yield res;
2755 }
2756 if succeeded {
2757 self.succeeded.inc()
2758 } else {
2759 self.failed.inc()
2760 }
2761 let elapsed_seconds = start.elapsed().as_secs_f64();
2762 self.seconds.inc_by(elapsed_seconds);
2763 if let Some(h) = &self.seconds_histogram {
2764 h.observe(elapsed_seconds);
2765 }
2766 }
2767 }
2768}
2769
2770#[derive(Debug)]
2771pub struct BlobMetrics {
2772 set: ExternalOpMetrics,
2773 get: ExternalOpMetrics,
2774 list_keys: ExternalOpMetrics,
2775 delete: ExternalOpMetrics,
2776 restore: ExternalOpMetrics,
2777 delete_noop: IntCounter,
2778 blob_sizes: Histogram,
2779 pub rtt_latency: Gauge,
2780}
2781
2782#[derive(Debug)]
2783pub struct MetricsBlob {
2784 blob: Arc<dyn Blob>,
2785 metrics: Arc<Metrics>,
2786}
2787
2788impl MetricsBlob {
2789 pub fn new(blob: Arc<dyn Blob>, metrics: Arc<Metrics>) -> Self {
2790 MetricsBlob { blob, metrics }
2791 }
2792
2793 fn on_err(alerts_metrics: &AlertsMetrics, _err: &ExternalError) {
2794 alerts_metrics.blob_failures.inc()
2795 }
2796}
2797
2798#[async_trait]
2799impl Blob for MetricsBlob {
2800 #[instrument(name = "blob::get", fields(shard=blob_key_shard_id(key)))]
2801 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
2802 let res = self
2803 .metrics
2804 .blob
2805 .get
2806 .run_op(|| self.blob.get(key), Self::on_err)
2807 .await;
2808 if let Ok(Some(value)) = res.as_ref() {
2809 self.metrics
2810 .blob
2811 .get
2812 .bytes
2813 .inc_by(u64::cast_from(value.len()));
2814 }
2815 res
2816 }
2817
2818 #[instrument(name = "blob::list_keys_and_metadata", fields(shard=blob_key_shard_id(key_prefix)))]
2819 async fn list_keys_and_metadata(
2820 &self,
2821 key_prefix: &str,
2822 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
2823 ) -> Result<(), ExternalError> {
2824 let mut byte_total = 0;
2825 let mut instrumented = |blob_metadata: BlobMetadata| {
2826 byte_total += blob_metadata.key.len();
2829 f(blob_metadata)
2830 };
2831
2832 let res = self
2833 .metrics
2834 .blob
2835 .list_keys
2836 .run_op(
2837 || {
2838 self.blob
2839 .list_keys_and_metadata(key_prefix, &mut instrumented)
2840 },
2841 Self::on_err,
2842 )
2843 .await;
2844
2845 self.metrics
2846 .blob
2847 .list_keys
2848 .bytes
2849 .inc_by(u64::cast_from(byte_total));
2850
2851 res
2852 }
2853
2854 #[instrument(name = "blob::set", fields(shard=blob_key_shard_id(key),size_bytes=value.len()))]
2855 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
2856 let bytes = value.len();
2857 let res = self
2858 .metrics
2859 .blob
2860 .set
2861 .run_op(|| self.blob.set(key, value), Self::on_err)
2862 .await;
2863 if res.is_ok() {
2864 self.metrics.blob.set.bytes.inc_by(u64::cast_from(bytes));
2865 self.metrics.blob.blob_sizes.observe(f64::cast_lossy(bytes));
2866 }
2867 res
2868 }
2869
2870 #[instrument(name = "blob::delete", fields(shard=blob_key_shard_id(key)))]
2871 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
2872 let bytes = self
2873 .metrics
2874 .blob
2875 .delete
2876 .run_op(|| self.blob.delete(key), Self::on_err)
2877 .await?;
2878 if let Some(bytes) = bytes {
2879 self.metrics.blob.delete.bytes.inc_by(u64::cast_from(bytes));
2880 } else {
2881 self.metrics.blob.delete_noop.inc();
2882 }
2883 Ok(bytes)
2884 }
2885
2886 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
2887 self.metrics
2888 .blob
2889 .restore
2890 .run_op(|| self.blob.restore(key), Self::on_err)
2891 .await
2892 }
2893}
2894
2895#[derive(Debug)]
2896pub struct ConsensusMetrics {
2897 list_keys: ExternalOpMetrics,
2898 head: ExternalOpMetrics,
2899 compare_and_set: ExternalOpMetrics,
2900 scan: ExternalOpMetrics,
2901 truncate: ExternalOpMetrics,
2902 truncated_count: IntCounter,
2903 pub rtt_latency: Gauge,
2904}
2905
2906#[derive(Debug)]
2907pub struct MetricsConsensus {
2908 consensus: Arc<dyn Consensus>,
2909 metrics: Arc<Metrics>,
2910}
2911
2912impl MetricsConsensus {
2913 pub fn new(consensus: Arc<dyn Consensus>, metrics: Arc<Metrics>) -> Self {
2914 MetricsConsensus { consensus, metrics }
2915 }
2916
2917 fn on_err(alerts_metrics: &AlertsMetrics, err: &ExternalError) {
2918 if let ExternalError::Indeterminate(_) = err {
2922 alerts_metrics.consensus_failures.inc()
2923 }
2924 }
2925}
2926
2927#[async_trait]
2928impl Consensus for MetricsConsensus {
2929 fn list_keys(&self) -> ResultStream<'_, String> {
2930 Box::pin(
2931 self.metrics
2932 .consensus
2933 .list_keys
2934 .run_stream(|| self.consensus.list_keys(), Self::on_err),
2935 )
2936 }
2937
2938 #[instrument(name = "consensus::head", fields(shard=key))]
2939 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
2940 let res = self
2941 .metrics
2942 .consensus
2943 .head
2944 .run_op(|| self.consensus.head(key), Self::on_err)
2945 .await;
2946 if let Ok(Some(data)) = res.as_ref() {
2947 self.metrics
2948 .consensus
2949 .head
2950 .bytes
2951 .inc_by(u64::cast_from(data.data.len()));
2952 }
2953 res
2954 }
2955
2956 #[instrument(name = "consensus::compare_and_set", fields(shard=key,size_bytes=new.data.len()))]
2957 async fn compare_and_set(
2958 &self,
2959 key: &str,
2960 expected: Option<SeqNo>,
2961 new: VersionedData,
2962 ) -> Result<CaSResult, ExternalError> {
2963 let bytes = new.data.len();
2964 let res = self
2965 .metrics
2966 .consensus
2967 .compare_and_set
2968 .run_op(
2969 || self.consensus.compare_and_set(key, expected, new),
2970 Self::on_err,
2971 )
2972 .await;
2973 match res.as_ref() {
2974 Ok(CaSResult::Committed) => self
2975 .metrics
2976 .consensus
2977 .compare_and_set
2978 .bytes
2979 .inc_by(u64::cast_from(bytes)),
2980 Ok(CaSResult::ExpectationMismatch) | Err(_) => {}
2981 }
2982 res
2983 }
2984
2985 #[instrument(name = "consensus::scan", fields(shard=key))]
2986 async fn scan(
2987 &self,
2988 key: &str,
2989 from: SeqNo,
2990 limit: usize,
2991 ) -> Result<Vec<VersionedData>, ExternalError> {
2992 let res = self
2993 .metrics
2994 .consensus
2995 .scan
2996 .run_op(|| self.consensus.scan(key, from, limit), Self::on_err)
2997 .await;
2998 if let Ok(dataz) = res.as_ref() {
2999 let bytes: usize = dataz.iter().map(|x| x.data.len()).sum();
3000 self.metrics
3001 .consensus
3002 .scan
3003 .bytes
3004 .inc_by(u64::cast_from(bytes));
3005 }
3006 res
3007 }
3008
3009 #[instrument(name = "consensus::truncate", fields(shard=key))]
3010 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
3011 let deleted = self
3012 .metrics
3013 .consensus
3014 .truncate
3015 .run_op(|| self.consensus.truncate(key, seqno), Self::on_err)
3016 .await?;
3017 self.metrics
3018 .consensus
3019 .truncated_count
3020 .inc_by(u64::cast_from(deleted));
3021 Ok(deleted)
3022 }
3023}
3024
3025#[derive(Debug, Clone)]
3028pub struct TaskMetrics {
3029 f64_gauges: Vec<(Gauge, fn(&tokio_metrics::TaskMetrics) -> f64)>,
3030 u64_gauges: Vec<(
3031 GenericGauge<AtomicU64>,
3032 fn(&tokio_metrics::TaskMetrics) -> u64,
3033 )>,
3034 monitor: TaskMonitor,
3035}
3036
3037impl TaskMetrics {
3038 pub fn new(name: &str) -> Self {
3039 let monitor = TaskMonitor::new();
3040 Self {
3041 f64_gauges: vec![
3042 (
3043 Gauge::make_collector(metric!(
3044 name: "mz_persist_task_total_idle_duration",
3045 help: "Seconds of time spent idling, ie. waiting for a task to be woken up.",
3046 const_labels: {"name" => name}
3047 )),
3048 |m| m.total_idle_duration.as_secs_f64(),
3049 ),
3050 (
3051 Gauge::make_collector(metric!(
3052 name: "mz_persist_task_total_scheduled_duration",
3053 help: "Seconds of time spent scheduled, ie. ready to poll but not yet polled.",
3054 const_labels: {"name" => name}
3055 )),
3056 |m| m.total_scheduled_duration.as_secs_f64(),
3057 ),
3058 ],
3059 u64_gauges: vec![
3060 (
3061 MakeCollector::make_collector(metric!(
3062 name: "mz_persist_task_total_scheduled_count",
3063 help: "The total number of task schedules. Useful for computing the average scheduled time.",
3064 const_labels: {"name" => name}
3065 )),
3066 |m| m.total_scheduled_count,
3067 ),
3068 (
3069 MakeCollector::make_collector(metric!(
3070 name: "mz_persist_task_total_idled_count",
3071 help: "The total number of task idles. Useful for computing the average idle time.",
3072 const_labels: {"name" => name}
3073 ,
3074 )),
3075 |m| m.total_idled_count,
3076 ),
3077 ],
3078 monitor,
3079 }
3080 }
3081
3082 pub fn instrument_task<F>(&self, task: F) -> tokio_metrics::Instrumented<F> {
3085 TaskMonitor::instrument(&self.monitor, task)
3086 }
3087}
3088
3089impl Collector for TaskMetrics {
3090 fn desc(&self) -> Vec<&Desc> {
3091 let mut descs = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
3092 for (g, _) in &self.f64_gauges {
3093 descs.extend(g.desc());
3094 }
3095 for (g, _) in &self.u64_gauges {
3096 descs.extend(g.desc());
3097 }
3098 descs
3099 }
3100
3101 fn collect(&self) -> Vec<MetricFamily> {
3102 let mut families = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
3103 let metrics = self.monitor.cumulative();
3104 for (g, metrics_fn) in &self.f64_gauges {
3105 g.set(metrics_fn(&metrics));
3106 families.extend(g.collect());
3107 }
3108 for (g, metrics_fn) in &self.u64_gauges {
3109 g.set(metrics_fn(&metrics));
3110 families.extend(g.collect());
3111 }
3112 families
3113 }
3114}
3115
3116#[derive(Debug)]
3117pub struct TasksMetrics {
3118 pub heartbeat_read: TaskMetrics,
3119}
3120
3121impl TasksMetrics {
3122 fn new(registry: &MetricsRegistry) -> Self {
3123 let heartbeat_read = TaskMetrics::new("heartbeat_read");
3124 registry.register_collector(heartbeat_read.clone());
3125 TasksMetrics { heartbeat_read }
3126 }
3127}
3128
3129#[derive(Debug)]
3130pub struct SchemaMetrics {
3131 pub(crate) cache_fetch_state_count: IntCounter,
3132 pub(crate) cache_schema: SchemaCacheMetrics,
3133 pub(crate) cache_migration: SchemaCacheMetrics,
3134 pub(crate) migration_count_same: IntCounter,
3135 pub(crate) migration_count_codec: IntCounter,
3136 pub(crate) migration_count_either: IntCounter,
3137 pub(crate) migration_len_legacy_codec: IntCounter,
3138 pub(crate) migration_len_either_codec: IntCounter,
3139 pub(crate) migration_len_either_arrow: IntCounter,
3140 pub(crate) migration_new_count: IntCounter,
3141 pub(crate) migration_new_seconds: Counter,
3142 pub(crate) migration_migrate_seconds: Counter,
3143}
3144
3145impl SchemaMetrics {
3146 fn new(registry: &MetricsRegistry) -> Self {
3147 let cached: IntCounterVec = registry.register(metric!(
3148 name: "mz_persist_schema_cache_cached_count",
3149 help: "count of schema cache entries served from cache",
3150 var_labels: ["op"],
3151 ));
3152 let computed: IntCounterVec = registry.register(metric!(
3153 name: "mz_persist_schema_cache_computed_count",
3154 help: "count of schema cache entries computed",
3155 var_labels: ["op"],
3156 ));
3157 let unavailable: IntCounterVec = registry.register(metric!(
3158 name: "mz_persist_schema_cache_unavailable_count",
3159 help: "count of schema cache entries unavailable at current state",
3160 var_labels: ["op"],
3161 ));
3162 let added: IntCounterVec = registry.register(metric!(
3163 name: "mz_persist_schema_cache_added_count",
3164 help: "count of schema cache entries added",
3165 var_labels: ["op"],
3166 ));
3167 let dropped: IntCounterVec = registry.register(metric!(
3168 name: "mz_persist_schema_cache_dropped_count",
3169 help: "count of schema cache entries dropped",
3170 var_labels: ["op"],
3171 ));
3172 let cache = |name| SchemaCacheMetrics {
3173 cached_count: cached.with_label_values(&[name]),
3174 computed_count: computed.with_label_values(&[name]),
3175 unavailable_count: unavailable.with_label_values(&[name]),
3176 added_count: added.with_label_values(&[name]),
3177 dropped_count: dropped.with_label_values(&[name]),
3178 };
3179 let migration_count: IntCounterVec = registry.register(metric!(
3180 name: "mz_persist_schema_migration_count",
3181 help: "count of fetch part migrations",
3182 var_labels: ["op"],
3183 ));
3184 let migration_len: IntCounterVec = registry.register(metric!(
3185 name: "mz_persist_schema_migration_len",
3186 help: "count of migrated update records",
3187 var_labels: ["op"],
3188 ));
3189 SchemaMetrics {
3190 cache_fetch_state_count: registry.register(metric!(
3191 name: "mz_persist_schema_cache_fetch_state_count",
3192 help: "count of state fetches by the schema cache",
3193 )),
3194 cache_schema: cache("schema"),
3195 cache_migration: cache("migration"),
3196 migration_count_same: migration_count.with_label_values(&["same"]),
3197 migration_count_codec: migration_count.with_label_values(&["codec"]),
3198 migration_count_either: migration_count.with_label_values(&["either"]),
3199 migration_len_legacy_codec: migration_len.with_label_values(&["legacy_codec"]),
3200 migration_len_either_codec: migration_len.with_label_values(&["either_codec"]),
3201 migration_len_either_arrow: migration_len.with_label_values(&["either_arrow"]),
3202 migration_new_count: registry.register(metric!(
3203 name: "mz_persist_schema_migration_new_count",
3204 help: "count of migrations constructed",
3205 )),
3206 migration_new_seconds: registry.register(metric!(
3207 name: "mz_persist_schema_migration_new_seconds",
3208 help: "seconds spent constructing migration logic",
3209 )),
3210 migration_migrate_seconds: registry.register(metric!(
3211 name: "mz_persist_schema_migration_migrate_seconds",
3212 help: "seconds spent applying migration logic",
3213 )),
3214 }
3215 }
3216}
3217
3218#[derive(Debug, Clone)]
3219pub struct SchemaCacheMetrics {
3220 pub(crate) cached_count: IntCounter,
3221 pub(crate) computed_count: IntCounter,
3222 pub(crate) unavailable_count: IntCounter,
3223 pub(crate) added_count: IntCounter,
3224 pub(crate) dropped_count: IntCounter,
3225}
3226
3227#[derive(Debug)]
3228pub struct InlineMetrics {
3229 pub(crate) part_commit_count: IntCounter,
3230 pub(crate) part_commit_bytes: IntCounter,
3231 pub(crate) backpressure: BatchWriteMetrics,
3232}
3233
3234impl InlineMetrics {
3235 fn new(registry: &MetricsRegistry) -> Self {
3236 InlineMetrics {
3237 part_commit_count: registry.register(metric!(
3238 name: "mz_persist_inline_part_commit_count",
3239 help: "count of inline parts committed to state",
3240 )),
3241 part_commit_bytes: registry.register(metric!(
3242 name: "mz_persist_inline_part_commit_bytes",
3243 help: "total size of of inline parts committed to state",
3244 )),
3245 backpressure: BatchWriteMetrics::new(registry, "inline_backpressure"),
3246 }
3247 }
3248}
3249
3250fn blob_key_shard_id(key: &str) -> Option<String> {
3251 let (shard_id, _) = BlobKey::parse_ids(key).ok()?;
3252 Some(shard_id.to_string())
3253}
3254
3255pub fn encode_ts_metric<T: Codec64>(ts: &Antichain<T>) -> i64 {
3257 match ts.elements().first() {
3267 Some(ts) => i64::from_le_bytes(Codec64::encode(ts)),
3268 None => i64::MAX,
3269 }
3270}