1use async_stream::stream;
13use mz_persist_types::stats::PartStatsMetrics;
14use std::collections::BTreeMap;
15use std::sync::{Arc, Mutex, Weak};
16use std::time::{Duration, Instant};
17use tokio::sync::{OnceCell, OwnedSemaphorePermit, Semaphore};
18
19use async_trait::async_trait;
20use bytes::Bytes;
21use futures_util::StreamExt;
22use mz_ore::bytes::SegmentedBytes;
23use mz_ore::cast::{CastFrom, CastLossy};
24use mz_ore::instrument;
25use mz_ore::metric;
26use mz_ore::metrics::{
27 ComputedGauge, ComputedIntGauge, ComputedUIntGauge, Counter, DeleteOnDropCounter,
28 DeleteOnDropGauge, IntCounter, MakeCollector, MetricVecExt, MetricsRegistry, UIntGauge,
29 UIntGaugeVec, raw,
30};
31use mz_ore::stats::histogram_seconds_buckets;
32use mz_persist::location::{
33 Blob, BlobMetadata, CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData,
34};
35use mz_persist::metrics::{ColumnarMetrics, S3BlobMetrics};
36use mz_persist::retry::RetryStream;
37use mz_persist_types::Codec64;
38use mz_postgres_client::metrics::PostgresClientMetrics;
39use prometheus::core::{AtomicI64, AtomicU64, Collector, Desc, GenericGauge};
40use prometheus::proto::MetricFamily;
41use prometheus::{CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounterVec};
42use timely::progress::Antichain;
43use tokio_metrics::TaskMonitor;
44use tracing::{Instrument, debug, info, info_span};
45
46use crate::fetch::{FETCH_SEMAPHORE_COST_ADJUSTMENT, FETCH_SEMAPHORE_PERMIT_ADJUSTMENT};
47use crate::internal::paths::BlobKey;
48use crate::{PersistConfig, ShardId};
49
50pub struct Metrics {
55 _vecs: MetricsVecs,
56 _uptime: ComputedGauge,
57
58 pub blob: BlobMetrics,
60 pub consensus: ConsensusMetrics,
62 pub cmds: CmdsMetrics,
64 pub retries: RetriesMetrics,
66 pub user: BatchWriteMetrics,
69 pub read: BatchPartReadMetrics,
71 pub compaction: CompactionMetrics,
73 pub gc: GcMetrics,
75 pub lease: LeaseMetrics,
77 pub codecs: CodecsMetrics,
79 pub state: StateMetrics,
81 pub shards: ShardsMetrics,
83 pub audit: UsageAuditMetrics,
85 pub locks: LocksMetrics,
87 pub watch: WatchMetrics,
89 pub pubsub_client: PubSubClientMetrics,
91 pub pushdown: PushdownMetrics,
93 pub consolidation: ConsolidationMetrics,
95 pub blob_cache_mem: BlobMemCache,
97 pub tasks: TasksMetrics,
99 pub columnar: ColumnarMetrics,
101 pub schema: SchemaMetrics,
103 pub inline: InlineMetrics,
105 pub(crate) semaphore: SemaphoreMetrics,
107
108 pub sink: SinkMetrics,
110
111 pub s3_blob: S3BlobMetrics,
113 pub postgres_consensus: PostgresClientMetrics,
115
116 #[allow(dead_code)]
117 pub(crate) registry: MetricsRegistry,
118}
119
120impl std::fmt::Debug for Metrics {
121 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
122 f.debug_struct("Metrics").finish_non_exhaustive()
123 }
124}
125
126impl Metrics {
127 pub fn new(cfg: &PersistConfig, registry: &MetricsRegistry) -> Self {
129 let vecs = MetricsVecs::new(registry);
130 let start = Instant::now();
131 let uptime = registry.register_computed_gauge(
132 metric!(
133 name: "mz_persist_metadata_seconds",
134 help: "server uptime, labels are build metadata",
135 const_labels: {
136 "version" => cfg.build_version,
137 "build_type" => if cfg!(release) { "release" } else { "debug" }
138 },
139 ),
140 move || start.elapsed().as_secs_f64(),
141 );
142 let s3_blob = S3BlobMetrics::new(registry);
143 let columnar = ColumnarMetrics::new(registry);
144 Metrics {
145 blob: vecs.blob_metrics(),
146 consensus: vecs.consensus_metrics(),
147 cmds: vecs.cmds_metrics(registry),
148 retries: vecs.retries_metrics(),
149 codecs: vecs.codecs_metrics(),
150 user: BatchWriteMetrics::new(registry, "user"),
151 read: vecs.batch_part_read_metrics(),
152 compaction: CompactionMetrics::new(registry),
153 gc: GcMetrics::new(registry),
154 lease: LeaseMetrics::new(registry),
155 state: StateMetrics::new(registry),
156 shards: ShardsMetrics::new(registry),
157 audit: UsageAuditMetrics::new(registry),
158 locks: vecs.locks_metrics(),
159 watch: WatchMetrics::new(registry),
160 pubsub_client: PubSubClientMetrics::new(registry),
161 pushdown: PushdownMetrics::new(registry),
162 consolidation: ConsolidationMetrics::new(registry),
163 blob_cache_mem: BlobMemCache::new(registry),
164 tasks: TasksMetrics::new(registry),
165 columnar,
166 schema: SchemaMetrics::new(registry),
167 inline: InlineMetrics::new(registry),
168 semaphore: SemaphoreMetrics::new(cfg.clone(), registry.clone()),
169 sink: SinkMetrics::new(registry),
170 s3_blob,
171 postgres_consensus: PostgresClientMetrics::new(registry, "mz_persist"),
172 _vecs: vecs,
173 _uptime: uptime,
174 registry: registry.clone(),
175 }
176 }
177
178 pub fn write_amplification(&self) -> f64 {
183 let total_written = self.blob.set.bytes.get();
186 let user_written = self.user.goodbytes.get();
187 #[allow(clippy::as_conversions)]
188 {
189 total_written as f64 / user_written as f64
190 }
191 }
192}
193
194#[derive(Debug)]
195struct MetricsVecs {
196 cmd_started: IntCounterVec,
197 cmd_cas_mismatch: IntCounterVec,
198 cmd_succeeded: IntCounterVec,
199 cmd_failed: IntCounterVec,
200 cmd_seconds: CounterVec,
201
202 external_op_started: IntCounterVec,
203 external_op_succeeded: IntCounterVec,
204 external_op_failed: IntCounterVec,
205 external_op_bytes: IntCounterVec,
206 external_op_seconds: CounterVec,
207 external_consensus_truncated_count: IntCounter,
208 external_blob_delete_noop_count: IntCounter,
209 external_blob_sizes: Histogram,
210 external_rtt_latency: GaugeVec,
211 external_op_latency: HistogramVec,
212
213 retry_started: IntCounterVec,
214 retry_finished: IntCounterVec,
215 retry_retries: IntCounterVec,
216 retry_sleep_seconds: CounterVec,
217
218 encode_count: IntCounterVec,
219 encode_seconds: CounterVec,
220 decode_count: IntCounterVec,
221 decode_seconds: CounterVec,
222
223 read_part_bytes: IntCounterVec,
224 read_part_goodbytes: IntCounterVec,
225 read_part_count: IntCounterVec,
226 read_part_seconds: CounterVec,
227 read_ts_rewrite: IntCounterVec,
228
229 lock_acquire_count: IntCounterVec,
230 lock_blocking_acquire_count: IntCounterVec,
231 lock_blocking_seconds: CounterVec,
232
233 alerts_metrics: Arc<AlertsMetrics>,
235}
236
237impl MetricsVecs {
238 fn new(registry: &MetricsRegistry) -> Self {
239 MetricsVecs {
240 cmd_started: registry.register(metric!(
241 name: "mz_persist_cmd_started_count",
242 help: "count of commands started",
243 var_labels: ["cmd"],
244 )),
245 cmd_cas_mismatch: registry.register(metric!(
246 name: "mz_persist_cmd_cas_mismatch_count",
247 help: "count of command retries from CaS mismatch",
248 var_labels: ["cmd"],
249 )),
250 cmd_succeeded: registry.register(metric!(
251 name: "mz_persist_cmd_succeeded_count",
252 help: "count of commands succeeded",
253 var_labels: ["cmd"],
254 )),
255 cmd_failed: registry.register(metric!(
256 name: "mz_persist_cmd_failed_count",
257 help: "count of commands failed",
258 var_labels: ["cmd"],
259 )),
260 cmd_seconds: registry.register(metric!(
261 name: "mz_persist_cmd_seconds",
262 help: "time spent applying commands",
263 var_labels: ["cmd"],
264 )),
265
266 external_op_started: registry.register(metric!(
267 name: "mz_persist_external_started_count",
268 help: "count of external service calls started",
269 var_labels: ["op"],
270 )),
271 external_op_succeeded: registry.register(metric!(
272 name: "mz_persist_external_succeeded_count",
273 help: "count of external service calls succeeded",
274 var_labels: ["op"],
275 )),
276 external_op_failed: registry.register(metric!(
277 name: "mz_persist_external_failed_count",
278 help: "count of external service calls failed",
279 var_labels: ["op"],
280 )),
281 external_op_bytes: registry.register(metric!(
282 name: "mz_persist_external_bytes_count",
283 help: "total size represented by external service calls",
284 var_labels: ["op"],
285 )),
286 external_op_seconds: registry.register(metric!(
287 name: "mz_persist_external_seconds",
288 help: "time spent in external service calls",
289 var_labels: ["op"],
290 )),
291 external_consensus_truncated_count: registry.register(metric!(
292 name: "mz_persist_external_consensus_truncated_count",
293 help: "count of versions deleted by consensus truncate calls",
294 )),
295 external_blob_delete_noop_count: registry.register(metric!(
296 name: "mz_persist_external_blob_delete_noop_count",
297 help: "count of blob delete calls that deleted a non-existent key",
298 )),
299 external_blob_sizes: registry.register(metric!(
300 name: "mz_persist_external_blob_sizes",
301 help: "histogram of blob sizes at put time",
302 buckets: mz_ore::stats::HISTOGRAM_BYTE_BUCKETS.to_vec(),
303 )),
304 external_rtt_latency: registry.register(metric!(
305 name: "mz_persist_external_rtt_latency",
306 help: "roundtrip-time to external service as seen by this process",
307 var_labels: ["external"],
308 )),
309 external_op_latency: registry.register(metric!(
310 name: "mz_persist_external_op_latency",
311 help: "rountrip latency observed by individual performance-critical operations",
312 var_labels: ["op"],
313 buckets: histogram_seconds_buckets(0.000_500, 32.0),
316 )),
317
318 retry_started: registry.register(metric!(
319 name: "mz_persist_retry_started_count",
320 help: "count of retry loops started",
321 var_labels: ["op"],
322 )),
323 retry_finished: registry.register(metric!(
324 name: "mz_persist_retry_finished_count",
325 help: "count of retry loops finished",
326 var_labels: ["op"],
327 )),
328 retry_retries: registry.register(metric!(
329 name: "mz_persist_retry_retries_count",
330 help: "count of total attempts by retry loops",
331 var_labels: ["op"],
332 )),
333 retry_sleep_seconds: registry.register(metric!(
334 name: "mz_persist_retry_sleep_seconds",
335 help: "time spent in retry loop backoff",
336 var_labels: ["op"],
337 )),
338
339 encode_count: registry.register(metric!(
340 name: "mz_persist_encode_count",
341 help: "count of op encodes",
342 var_labels: ["op"],
343 )),
344 encode_seconds: registry.register(metric!(
345 name: "mz_persist_encode_seconds",
346 help: "time spent in op encodes",
347 var_labels: ["op"],
348 )),
349 decode_count: registry.register(metric!(
350 name: "mz_persist_decode_count",
351 help: "count of op decodes",
352 var_labels: ["op"],
353 )),
354 decode_seconds: registry.register(metric!(
355 name: "mz_persist_decode_seconds",
356 help: "time spent in op decodes",
357 var_labels: ["op"],
358 )),
359
360 read_part_bytes: registry.register(metric!(
361 name: "mz_persist_read_batch_part_bytes",
362 help: "total encoded size of batch parts read",
363 var_labels: ["op"],
364 )),
365 read_part_goodbytes: registry.register(metric!(
366 name: "mz_persist_read_batch_part_goodbytes",
367 help: "total logical size of batch parts read",
368 var_labels: ["op"],
369 )),
370 read_part_count: registry.register(metric!(
371 name: "mz_persist_read_batch_part_count",
372 help: "count of batch parts read",
373 var_labels: ["op"],
374 )),
375 read_part_seconds: registry.register(metric!(
376 name: "mz_persist_read_batch_part_seconds",
377 help: "time spent reading batch parts",
378 var_labels: ["op"],
379 )),
380 read_ts_rewrite: registry.register(metric!(
381 name: "mz_persist_read_ts_rewite",
382 help: "count of updates read with rewritten ts",
383 var_labels: ["op"],
384 )),
385
386 lock_acquire_count: registry.register(metric!(
387 name: "mz_persist_lock_acquire_count",
388 help: "count of locks acquired",
389 var_labels: ["op"],
390 )),
391 lock_blocking_acquire_count: registry.register(metric!(
392 name: "mz_persist_lock_blocking_acquire_count",
393 help: "count of locks acquired that required blocking",
394 var_labels: ["op"],
395 )),
396 lock_blocking_seconds: registry.register(metric!(
397 name: "mz_persist_lock_blocking_seconds",
398 help: "time spent blocked for a lock",
399 var_labels: ["op"],
400 )),
401
402 alerts_metrics: Arc::new(AlertsMetrics::new(registry)),
403 }
404 }
405
406 fn cmds_metrics(&self, registry: &MetricsRegistry) -> CmdsMetrics {
407 CmdsMetrics {
408 init_state: self.cmd_metrics("init_state"),
409 add_rollup: self.cmd_metrics("add_rollup"),
410 remove_rollups: self.cmd_metrics("remove_rollups"),
411 register: self.cmd_metrics("register"),
412 compare_and_append: self.cmd_metrics("compare_and_append"),
413 compare_and_append_noop: registry.register(metric!(
414 name: "mz_persist_cmd_compare_and_append_noop",
415 help: "count of compare_and_append retries that were discoverd to have already committed",
416 )),
417 compare_and_downgrade_since: self.cmd_metrics("compare_and_downgrade_since"),
418 downgrade_since: self.cmd_metrics("downgrade_since"),
419 expire_reader: self.cmd_metrics("expire_reader"),
420 expire_writer: self.cmd_metrics("expire_writer"),
421 merge_res: self.cmd_metrics("merge_res"),
422 become_tombstone: self.cmd_metrics("become_tombstone"),
423 compare_and_evolve_schema: self.cmd_metrics("compare_and_evolve_schema"),
424 spine_exert: self.cmd_metrics("spine_exert"),
425 fetch_upper_count: registry.register(metric!(
426 name: "mz_persist_cmd_fetch_upper_count",
427 help: "count of fetch_upper calls",
428 ))
429 }
430 }
431
432 fn cmd_metrics(&self, cmd: &str) -> CmdMetrics {
433 CmdMetrics {
434 name: cmd.to_owned(),
435 started: self.cmd_started.with_label_values(&[cmd]),
436 succeeded: self.cmd_succeeded.with_label_values(&[cmd]),
437 cas_mismatch: self.cmd_cas_mismatch.with_label_values(&[cmd]),
438 failed: self.cmd_failed.with_label_values(&[cmd]),
439 seconds: self.cmd_seconds.with_label_values(&[cmd]),
440 }
441 }
442
443 fn retries_metrics(&self) -> RetriesMetrics {
444 RetriesMetrics {
445 determinate: RetryDeterminate {
446 apply_unbatched_cmd_cas: self.retry_metrics("apply_unbatched_cmd::cas"),
447 },
448 external: RetryExternal {
449 batch_delete: Arc::new(self.retry_metrics("batch::delete")),
450 batch_set: self.retry_metrics("batch::set"),
451 blob_open: self.retry_metrics("blob::open"),
452 compaction_noop_delete: Arc::new(self.retry_metrics("compaction_noop::delete")),
453 consensus_open: self.retry_metrics("consensus::open"),
454 fetch_batch_get: self.retry_metrics("fetch_batch::get"),
455 fetch_state_scan: self.retry_metrics("fetch_state::scan"),
456 gc_truncate: self.retry_metrics("gc::truncate"),
457 maybe_init_cas: self.retry_metrics("maybe_init::cas"),
458 rollup_delete: self.retry_metrics("rollup::delete"),
459 rollup_get: self.retry_metrics("rollup::get"),
460 rollup_set: self.retry_metrics("rollup::set"),
461 hollow_run_get: self.retry_metrics("hollow_run::get"),
462 hollow_run_set: self.retry_metrics("hollow_run::set"),
463 storage_usage_shard_size: self.retry_metrics("storage_usage::shard_size"),
464 },
465 compare_and_append_idempotent: self.retry_metrics("compare_and_append_idempotent"),
466 fetch_latest_state: self.retry_metrics("fetch_latest_state"),
467 fetch_live_states: self.retry_metrics("fetch_live_states"),
468 idempotent_cmd: self.retry_metrics("idempotent_cmd"),
469 next_listen_batch: self.retry_metrics("next_listen_batch"),
470 snapshot: self.retry_metrics("snapshot"),
471 }
472 }
473
474 fn retry_metrics(&self, name: &str) -> RetryMetrics {
475 RetryMetrics {
476 name: name.to_owned(),
477 started: self.retry_started.with_label_values(&[name]),
478 finished: self.retry_finished.with_label_values(&[name]),
479 retries: self.retry_retries.with_label_values(&[name]),
480 sleep_seconds: self.retry_sleep_seconds.with_label_values(&[name]),
481 }
482 }
483
484 fn codecs_metrics(&self) -> CodecsMetrics {
485 CodecsMetrics {
486 state: self.codec_metrics("state"),
487 state_diff: self.codec_metrics("state_diff"),
488 batch: self.codec_metrics("batch"),
489 key: self.codec_metrics("key"),
490 val: self.codec_metrics("val"),
491 }
492 }
493
494 fn codec_metrics(&self, op: &str) -> CodecMetrics {
495 CodecMetrics {
496 encode_count: self.encode_count.with_label_values(&[op]),
497 encode_seconds: self.encode_seconds.with_label_values(&[op]),
498 decode_count: self.decode_count.with_label_values(&[op]),
499 decode_seconds: self.decode_seconds.with_label_values(&[op]),
500 }
501 }
502
503 fn blob_metrics(&self) -> BlobMetrics {
504 BlobMetrics {
505 set: self.external_op_metrics("blob_set", true),
506 get: self.external_op_metrics("blob_get", true),
507 list_keys: self.external_op_metrics("blob_list_keys", false),
508 delete: self.external_op_metrics("blob_delete", false),
509 restore: self.external_op_metrics("restore", false),
510 delete_noop: self.external_blob_delete_noop_count.clone(),
511 blob_sizes: self.external_blob_sizes.clone(),
512 rtt_latency: self.external_rtt_latency.with_label_values(&["blob"]),
513 }
514 }
515
516 fn consensus_metrics(&self) -> ConsensusMetrics {
517 ConsensusMetrics {
518 list_keys: self.external_op_metrics("consensus_list_keys", false),
519 head: self.external_op_metrics("consensus_head", false),
520 compare_and_set: self.external_op_metrics("consensus_cas", true),
521 scan: self.external_op_metrics("consensus_scan", false),
522 truncate: self.external_op_metrics("consensus_truncate", false),
523 truncated_count: self.external_consensus_truncated_count.clone(),
524 rtt_latency: self.external_rtt_latency.with_label_values(&["consensus"]),
525 }
526 }
527
528 fn external_op_metrics(&self, op: &str, latency_histogram: bool) -> ExternalOpMetrics {
529 ExternalOpMetrics {
530 started: self.external_op_started.with_label_values(&[op]),
531 succeeded: self.external_op_succeeded.with_label_values(&[op]),
532 failed: self.external_op_failed.with_label_values(&[op]),
533 bytes: self.external_op_bytes.with_label_values(&[op]),
534 seconds: self.external_op_seconds.with_label_values(&[op]),
535 seconds_histogram: if latency_histogram {
536 Some(self.external_op_latency.with_label_values(&[op]))
537 } else {
538 None
539 },
540 alerts_metrics: Arc::clone(&self.alerts_metrics),
541 }
542 }
543
544 fn batch_part_read_metrics(&self) -> BatchPartReadMetrics {
545 BatchPartReadMetrics {
546 listen: self.read_metrics("listen"),
547 snapshot: self.read_metrics("snapshot"),
548 batch_fetcher: self.read_metrics("batch_fetcher"),
549 compaction: self.read_metrics("compaction"),
550 unindexed: self.read_metrics("unindexed"),
551 }
552 }
553
554 fn read_metrics(&self, op: &str) -> ReadMetrics {
555 ReadMetrics {
556 part_bytes: self.read_part_bytes.with_label_values(&[op]),
557 part_goodbytes: self.read_part_goodbytes.with_label_values(&[op]),
558 part_count: self.read_part_count.with_label_values(&[op]),
559 seconds: self.read_part_seconds.with_label_values(&[op]),
560 ts_rewrite: self.read_ts_rewrite.with_label_values(&[op]),
561 }
562 }
563
564 fn locks_metrics(&self) -> LocksMetrics {
565 LocksMetrics {
566 applier_read_cacheable: self.lock_metrics("applier_read_cacheable"),
567 applier_read_noncacheable: self.lock_metrics("applier_read_noncacheable"),
568 applier_write: self.lock_metrics("applier_write"),
569 watch: self.lock_metrics("watch"),
570 }
571 }
572
573 fn lock_metrics(&self, op: &str) -> LockMetrics {
574 LockMetrics {
575 acquire_count: self.lock_acquire_count.with_label_values(&[op]),
576 blocking_acquire_count: self.lock_blocking_acquire_count.with_label_values(&[op]),
577 blocking_seconds: self.lock_blocking_seconds.with_label_values(&[op]),
578 }
579 }
580}
581
582#[derive(Debug)]
583pub struct CmdMetrics {
584 pub(crate) name: String,
585 pub(crate) started: IntCounter,
586 pub(crate) cas_mismatch: IntCounter,
587 pub(crate) succeeded: IntCounter,
588 pub(crate) failed: IntCounter,
589 pub(crate) seconds: Counter,
590}
591
592impl CmdMetrics {
593 pub async fn run_cmd<R, E, F, CmdFn>(
594 &self,
595 shard_metrics: &ShardMetrics,
596 cmd_fn: CmdFn,
597 ) -> Result<R, E>
598 where
599 F: std::future::Future<Output = Result<R, E>>,
600 CmdFn: FnOnce() -> F,
601 {
602 self.started.inc();
603 let start = Instant::now();
604 let res = cmd_fn().await;
605 self.seconds.inc_by(start.elapsed().as_secs_f64());
606 match res.as_ref() {
607 Ok(_) => {
608 self.succeeded.inc();
609 shard_metrics.cmd_succeeded.inc();
610 }
611 Err(_) => self.failed.inc(),
612 };
613 res
614 }
615}
616
617#[derive(Debug)]
618pub struct CmdsMetrics {
619 pub(crate) init_state: CmdMetrics,
620 pub(crate) add_rollup: CmdMetrics,
621 pub(crate) remove_rollups: CmdMetrics,
622 pub(crate) register: CmdMetrics,
623 pub(crate) compare_and_append: CmdMetrics,
624 pub(crate) compare_and_append_noop: IntCounter,
625 pub(crate) compare_and_downgrade_since: CmdMetrics,
626 pub(crate) downgrade_since: CmdMetrics,
627 pub(crate) expire_reader: CmdMetrics,
628 pub(crate) expire_writer: CmdMetrics,
629 pub(crate) merge_res: CmdMetrics,
630 pub(crate) become_tombstone: CmdMetrics,
631 pub(crate) compare_and_evolve_schema: CmdMetrics,
632 pub(crate) spine_exert: CmdMetrics,
633 pub(crate) fetch_upper_count: IntCounter,
634}
635
636#[derive(Debug)]
637pub struct RetryMetrics {
638 pub(crate) name: String,
639 pub(crate) started: IntCounter,
640 pub(crate) finished: IntCounter,
641 pub(crate) retries: IntCounter,
642 pub(crate) sleep_seconds: Counter,
643}
644
645impl RetryMetrics {
646 pub(crate) fn stream(&self, retry: RetryStream) -> MetricsRetryStream {
647 MetricsRetryStream::new(retry, self)
648 }
649}
650
651#[derive(Debug)]
652pub struct RetryDeterminate {
653 pub(crate) apply_unbatched_cmd_cas: RetryMetrics,
654}
655
656#[derive(Debug)]
657pub struct RetryExternal {
658 pub(crate) batch_delete: Arc<RetryMetrics>,
659 pub(crate) batch_set: RetryMetrics,
660 pub(crate) blob_open: RetryMetrics,
661 pub(crate) compaction_noop_delete: Arc<RetryMetrics>,
662 pub(crate) consensus_open: RetryMetrics,
663 pub(crate) fetch_batch_get: RetryMetrics,
664 pub(crate) fetch_state_scan: RetryMetrics,
665 pub(crate) gc_truncate: RetryMetrics,
666 pub(crate) maybe_init_cas: RetryMetrics,
667 pub(crate) rollup_delete: RetryMetrics,
668 pub(crate) rollup_get: RetryMetrics,
669 pub(crate) rollup_set: RetryMetrics,
670 pub(crate) hollow_run_get: RetryMetrics,
671 pub(crate) hollow_run_set: RetryMetrics,
672 pub(crate) storage_usage_shard_size: RetryMetrics,
673}
674
675#[derive(Debug)]
676pub struct RetriesMetrics {
677 pub(crate) determinate: RetryDeterminate,
678 pub(crate) external: RetryExternal,
679
680 pub(crate) compare_and_append_idempotent: RetryMetrics,
681 pub(crate) fetch_latest_state: RetryMetrics,
682 pub(crate) fetch_live_states: RetryMetrics,
683 pub(crate) idempotent_cmd: RetryMetrics,
684 pub(crate) next_listen_batch: RetryMetrics,
685 pub(crate) snapshot: RetryMetrics,
686}
687
688#[derive(Debug)]
689pub struct BatchPartReadMetrics {
690 pub(crate) listen: ReadMetrics,
691 pub(crate) snapshot: ReadMetrics,
692 pub(crate) batch_fetcher: ReadMetrics,
693 pub(crate) compaction: ReadMetrics,
694 pub(crate) unindexed: ReadMetrics,
695}
696
697#[derive(Debug, Clone)]
698pub struct ReadMetrics {
699 pub(crate) part_bytes: IntCounter,
700 pub(crate) part_goodbytes: IntCounter,
701 pub(crate) part_count: IntCounter,
702 pub(crate) seconds: Counter,
703 pub(crate) ts_rewrite: IntCounter,
704}
705
706#[derive(Debug, Clone)]
709pub struct BatchWriteMetrics {
710 pub(crate) bytes: IntCounter,
711 pub(crate) goodbytes: IntCounter,
712 pub(crate) seconds: Counter,
713 pub(crate) write_stalls: IntCounter,
714 pub(crate) key_lower_too_big: IntCounter,
715
716 pub(crate) unordered: IntCounter,
717 pub(crate) codec_order: IntCounter,
718 pub(crate) structured_order: IntCounter,
719 _order_counts: IntCounterVec,
720
721 pub(crate) step_stats: Counter,
722 pub(crate) step_part_writing: Counter,
723 pub(crate) step_inline: Counter,
724}
725
726impl BatchWriteMetrics {
727 fn new(registry: &MetricsRegistry, name: &str) -> Self {
728 let order_counts: IntCounterVec = registry.register(metric!(
729 name: format!("mz_persist_{}_write_batch_order", name),
730 help: "count of batches by the data ordering",
731 var_labels: ["order"],
732 ));
733 let unordered = order_counts.with_label_values(&["unordered"]);
734 let codec_order = order_counts.with_label_values(&["codec"]);
735 let structured_order = order_counts.with_label_values(&["structured"]);
736
737 BatchWriteMetrics {
738 bytes: registry.register(metric!(
739 name: format!("mz_persist_{}_bytes", name),
740 help: format!("total encoded size of {} batches written", name),
741 )),
742 goodbytes: registry.register(metric!(
743 name: format!("mz_persist_{}_goodbytes", name),
744 help: format!("total logical size of {} batches written", name),
745 )),
746 seconds: registry.register(metric!(
747 name: format!("mz_persist_{}_write_batch_part_seconds", name),
748 help: format!("time spent writing {} batches", name),
749 )),
750 write_stalls: registry.register(metric!(
751 name: format!("mz_persist_{}_write_stall_count", name),
752 help: format!(
753 "count of {} writes stalling to await max outstanding reqs",
754 name
755 ),
756 )),
757 key_lower_too_big: registry.register(metric!(
758 name: format!("mz_persist_{}_key_lower_too_big", name),
759 help: format!(
760 "count of {} writes that were unable to write a key lower, because the size threshold was too low",
761 name
762 ),
763 )),
764 unordered,
765 codec_order,
766 structured_order,
767 _order_counts: order_counts,
768 step_stats: registry.register(metric!(
769 name: format!("mz_persist_{}_step_stats", name),
770 help: format!("time spent computing {} update stats", name),
771 )),
772 step_part_writing: registry.register(metric!(
773 name: format!("mz_persist_{}_step_part_writing", name),
774 help: format!("blocking time spent writing parts for {} updates", name),
775 )),
776 step_inline: registry.register(metric!(
777 name: format!("mz_persist_{}_step_inline", name),
778 help: format!("time spent encoding {} inline batches", name)
779 )),
780 }
781 }
782}
783
784#[derive(Debug)]
785pub struct CompactionMetrics {
786 pub(crate) requested: IntCounter,
787 pub(crate) dropped: IntCounter,
788 pub(crate) disabled: IntCounter,
789 pub(crate) skipped: IntCounter,
790 pub(crate) started: IntCounter,
791 pub(crate) applied: IntCounter,
792 pub(crate) timed_out: IntCounter,
793 pub(crate) failed: IntCounter,
794 pub(crate) noop: IntCounter,
795 pub(crate) seconds: Counter,
796 pub(crate) concurrency_waits: IntCounter,
797 pub(crate) queued_seconds: Counter,
798 pub(crate) memory_violations: IntCounter,
799 pub(crate) runs_compacted: IntCounter,
800 pub(crate) chunks_compacted: IntCounter,
801 pub(crate) not_all_prefetched: IntCounter,
802 pub(crate) parts_prefetched: IntCounter,
803 pub(crate) parts_waited: IntCounter,
804 pub(crate) fast_path_eligible: IntCounter,
805 pub(crate) admin_count: IntCounter,
806
807 pub(crate) applied_exact_match: IntCounter,
808 pub(crate) applied_subset_match: IntCounter,
809 pub(crate) not_applied_too_many_updates: IntCounter,
810
811 pub(crate) batch: BatchWriteMetrics,
812 pub(crate) steps: CompactionStepTimings,
813 pub(crate) schema_selection: CompactionSchemaSelection,
814
815 pub(crate) _steps_vec: CounterVec,
816}
817
818impl CompactionMetrics {
819 fn new(registry: &MetricsRegistry) -> Self {
820 let step_timings: CounterVec = registry.register(metric!(
821 name: "mz_persist_compaction_step_seconds",
822 help: "time spent on individual steps of compaction",
823 var_labels: ["step"],
824 ));
825 let schema_selection: CounterVec = registry.register(metric!(
826 name: "mz_persist_compaction_schema_selection",
827 help: "count of compactions and how we did schema selection",
828 var_labels: ["selection"],
829 ));
830
831 CompactionMetrics {
832 requested: registry.register(metric!(
833 name: "mz_persist_compaction_requested",
834 help: "count of total compaction requests",
835 )),
836 dropped: registry.register(metric!(
837 name: "mz_persist_compaction_dropped",
838 help: "count of total compaction requests dropped due to a full queue",
839 )),
840 disabled: registry.register(metric!(
841 name: "mz_persist_compaction_disabled",
842 help: "count of total compaction requests dropped because compaction was disabled",
843 )),
844 skipped: registry.register(metric!(
845 name: "mz_persist_compaction_skipped",
846 help: "count of compactions skipped due to heuristics",
847 )),
848 started: registry.register(metric!(
849 name: "mz_persist_compaction_started",
850 help: "count of compactions started",
851 )),
852 failed: registry.register(metric!(
853 name: "mz_persist_compaction_failed",
854 help: "count of compactions failed",
855 )),
856 applied: registry.register(metric!(
857 name: "mz_persist_compaction_applied",
858 help: "count of compactions applied to state",
859 )),
860 timed_out: registry.register(metric!(
861 name: "mz_persist_compaction_timed_out",
862 help: "count of compactions that timed out",
863 )),
864 noop: registry.register(metric!(
865 name: "mz_persist_compaction_noop",
866 help: "count of compactions discarded (obsolete)",
867 )),
868 seconds: registry.register(metric!(
869 name: "mz_persist_compaction_seconds",
870 help: "time spent in compaction",
871 )),
872 concurrency_waits: registry.register(metric!(
873 name: "mz_persist_compaction_concurrency_waits",
874 help: "count of compaction requests that ever blocked due to concurrency limit",
875 )),
876 queued_seconds: registry.register(metric!(
877 name: "mz_persist_compaction_queued_seconds",
878 help: "time that compaction requests spent queued",
879 )),
880 memory_violations: registry.register(metric!(
881 name: "mz_persist_compaction_memory_violations",
882 help: "count of compaction memory requirement violations",
883 )),
884 runs_compacted: registry.register(metric!(
885 name: "mz_persist_compaction_runs_compacted",
886 help: "count of runs compacted",
887 )),
888 chunks_compacted: registry.register(metric!(
889 name: "mz_persist_compaction_chunks_compacted",
890 help: "count of run chunks compacted",
891 )),
892 not_all_prefetched: registry.register(metric!(
893 name: "mz_persist_compaction_not_all_prefetched",
894 help: "count of compactions where not all inputs were prefetched",
895 )),
896 parts_prefetched: registry.register(metric!(
897 name: "mz_persist_compaction_parts_prefetched",
898 help: "count of compaction parts completely prefetched by the time they're needed",
899 )),
900 parts_waited: registry.register(metric!(
901 name: "mz_persist_compaction_parts_waited",
902 help: "count of compaction parts that had to be waited on",
903 )),
904 fast_path_eligible: registry.register(metric!(
905 name: "mz_persist_compaction_fast_path_eligible",
906 help: "count of compaction requests that could have used the fast-path optimization",
907 )),
908 admin_count: registry.register(metric!(
909 name: "mz_persist_compaction_admin_count",
910 help: "count of compaction requests that were performed by admin tooling",
911 )),
912 applied_exact_match: registry.register(metric!(
913 name: "mz_persist_compaction_applied_exact_match",
914 help: "count of merge results that exactly replaced a SpineBatch",
915 )),
916 applied_subset_match: registry.register(metric!(
917 name: "mz_persist_compaction_applied_subset_match",
918 help: "count of merge results that replaced a subset of a SpineBatch",
919 )),
920 not_applied_too_many_updates: registry.register(metric!(
921 name: "mz_persist_compaction_not_applied_too_many_updates",
922 help: "count of merge results that did not apply due to too many updates",
923 )),
924 batch: BatchWriteMetrics::new(registry, "compaction"),
925 steps: CompactionStepTimings::new(step_timings.clone()),
926 schema_selection: CompactionSchemaSelection::new(schema_selection.clone()),
927 _steps_vec: step_timings,
928 }
929 }
930}
931
932#[derive(Debug)]
933pub struct CompactionStepTimings {
934 pub(crate) part_fetch_seconds: Counter,
935 pub(crate) heap_population_seconds: Counter,
936}
937
938impl CompactionStepTimings {
939 fn new(step_timings: CounterVec) -> CompactionStepTimings {
940 CompactionStepTimings {
941 part_fetch_seconds: step_timings.with_label_values(&["part_fetch"]),
942 heap_population_seconds: step_timings.with_label_values(&["heap_population"]),
943 }
944 }
945}
946
947#[derive(Debug)]
948pub struct CompactionSchemaSelection {
949 pub(crate) recent_schema: Counter,
950 pub(crate) no_schema: Counter,
951}
952
953impl CompactionSchemaSelection {
954 fn new(schema_selection: CounterVec) -> CompactionSchemaSelection {
955 CompactionSchemaSelection {
956 recent_schema: schema_selection.with_label_values(&["recent"]),
957 no_schema: schema_selection.with_label_values(&["none"]),
958 }
959 }
960}
961
962#[derive(Debug)]
963pub struct GcMetrics {
964 pub(crate) noop: IntCounter,
965 pub(crate) started: IntCounter,
966 pub(crate) finished: IntCounter,
967 pub(crate) merged: IntCounter,
968 pub(crate) seconds: Counter,
969 pub(crate) steps: GcStepTimings,
970}
971
972#[derive(Debug)]
973pub struct GcStepTimings {
974 pub(crate) find_removable_rollups: Counter,
975 pub(crate) fetch_seconds: Counter,
976 pub(crate) find_deletable_blobs_seconds: Counter,
977 pub(crate) delete_rollup_seconds: Counter,
978 pub(crate) delete_batch_part_seconds: Counter,
979 pub(crate) truncate_diff_seconds: Counter,
980 pub(crate) remove_rollups_from_state: Counter,
981 pub(crate) post_gc_calculations_seconds: Counter,
982}
983
984impl GcStepTimings {
985 fn new(step_timings: CounterVec) -> Self {
986 Self {
987 find_removable_rollups: step_timings.with_label_values(&["find_removable_rollups"]),
988 fetch_seconds: step_timings.with_label_values(&["fetch"]),
989 find_deletable_blobs_seconds: step_timings.with_label_values(&["find_deletable_blobs"]),
990 delete_rollup_seconds: step_timings.with_label_values(&["delete_rollup"]),
991 delete_batch_part_seconds: step_timings.with_label_values(&["delete_batch_part"]),
992 truncate_diff_seconds: step_timings.with_label_values(&["truncate_diff"]),
993 remove_rollups_from_state: step_timings
994 .with_label_values(&["remove_rollups_from_state"]),
995 post_gc_calculations_seconds: step_timings.with_label_values(&["post_gc_calculations"]),
996 }
997 }
998}
999
1000impl GcMetrics {
1001 fn new(registry: &MetricsRegistry) -> Self {
1002 let step_timings: CounterVec = registry.register(metric!(
1003 name: "mz_persist_gc_step_seconds",
1004 help: "time spent on individual steps of gc",
1005 var_labels: ["step"],
1006 ));
1007 GcMetrics {
1008 noop: registry.register(metric!(
1009 name: "mz_persist_gc_noop",
1010 help: "count of garbage collections skipped because they were already done",
1011 )),
1012 started: registry.register(metric!(
1013 name: "mz_persist_gc_started",
1014 help: "count of garbage collections started",
1015 )),
1016 finished: registry.register(metric!(
1017 name: "mz_persist_gc_finished",
1018 help: "count of garbage collections finished",
1019 )),
1020 merged: registry.register(metric!(
1021 name: "mz_persist_gc_merged_reqs",
1022 help: "count of garbage collection requests merged",
1023 )),
1024 seconds: registry.register(metric!(
1025 name: "mz_persist_gc_seconds",
1026 help: "time spent in garbage collections",
1027 )),
1028 steps: GcStepTimings::new(step_timings),
1029 }
1030 }
1031}
1032
1033#[derive(Debug)]
1034pub struct LeaseMetrics {
1035 pub(crate) timeout_read: IntCounter,
1036 pub(crate) dropped_part: IntCounter,
1037}
1038
1039impl LeaseMetrics {
1040 fn new(registry: &MetricsRegistry) -> Self {
1041 LeaseMetrics {
1042 timeout_read: registry.register(metric!(
1043 name: "mz_persist_lease_timeout_read",
1044 help: "count of readers whose lease timed out",
1045 )),
1046 dropped_part: registry.register(metric!(
1047 name: "mz_persist_lease_dropped_part",
1048 help: "count of LeasedBatchParts that were dropped without being politely returned",
1049 )),
1050 }
1051 }
1052}
1053
1054struct IncOnDrop(IntCounter);
1055
1056impl Drop for IncOnDrop {
1057 fn drop(&mut self) {
1058 self.0.inc()
1059 }
1060}
1061
1062pub struct MetricsRetryStream {
1063 retry: RetryStream,
1064 pub(crate) retries: IntCounter,
1065 sleep_seconds: Counter,
1066 _finished: IncOnDrop,
1067}
1068
1069impl MetricsRetryStream {
1070 pub fn new(retry: RetryStream, metrics: &RetryMetrics) -> Self {
1071 metrics.started.inc();
1072 MetricsRetryStream {
1073 retry,
1074 retries: metrics.retries.clone(),
1075 sleep_seconds: metrics.sleep_seconds.clone(),
1076 _finished: IncOnDrop(metrics.finished.clone()),
1077 }
1078 }
1079
1080 pub fn attempt(&self) -> usize {
1082 self.retry.attempt()
1083 }
1084
1085 pub fn next_sleep(&self) -> Duration {
1087 self.retry.next_sleep()
1088 }
1089
1090 pub async fn sleep(self) -> Self {
1095 self.retries.inc();
1096 self.sleep_seconds
1097 .inc_by(self.retry.next_sleep().as_secs_f64());
1098 let retry = self.retry.sleep().await;
1099 MetricsRetryStream {
1100 retry,
1101 retries: self.retries,
1102 sleep_seconds: self.sleep_seconds,
1103 _finished: self._finished,
1104 }
1105 }
1106}
1107
1108#[derive(Debug)]
1109pub struct CodecsMetrics {
1110 pub(crate) state: CodecMetrics,
1111 pub(crate) state_diff: CodecMetrics,
1112 pub(crate) batch: CodecMetrics,
1113 pub(crate) key: CodecMetrics,
1114 pub(crate) val: CodecMetrics,
1115 }
1118
1119#[derive(Debug)]
1120pub struct CodecMetrics {
1121 pub(crate) encode_count: IntCounter,
1122 pub(crate) encode_seconds: Counter,
1123 pub(crate) decode_count: IntCounter,
1124 pub(crate) decode_seconds: Counter,
1125}
1126
1127impl CodecMetrics {
1128 pub(crate) fn encode<R, F: FnOnce() -> R>(&self, f: F) -> R {
1129 let now = Instant::now();
1130 let r = f();
1131 self.encode_count.inc();
1132 self.encode_seconds.inc_by(now.elapsed().as_secs_f64());
1133 r
1134 }
1135
1136 pub(crate) fn decode<R, F: FnOnce() -> R>(&self, f: F) -> R {
1137 let now = Instant::now();
1138 let r = f();
1139 self.decode_count.inc();
1140 self.decode_seconds.inc_by(now.elapsed().as_secs_f64());
1141 r
1142 }
1143}
1144
1145#[derive(Debug)]
1146pub struct StateMetrics {
1147 pub(crate) apply_spine_fast_path: IntCounter,
1148 pub(crate) apply_spine_slow_path: IntCounter,
1149 pub(crate) apply_spine_slow_path_lenient: IntCounter,
1150 pub(crate) apply_spine_slow_path_lenient_adjustment: IntCounter,
1151 pub(crate) apply_spine_slow_path_with_reconstruction: IntCounter,
1152 pub(crate) apply_spine_flattened: IntCounter,
1153 pub(crate) update_state_noop_path: IntCounter,
1154 pub(crate) update_state_empty_path: IntCounter,
1155 pub(crate) update_state_fast_path: IntCounter,
1156 pub(crate) update_state_slow_path: IntCounter,
1157 pub(crate) rollup_at_seqno_migration: IntCounter,
1158 pub(crate) fetch_recent_live_diffs_fast_path: IntCounter,
1159 pub(crate) fetch_recent_live_diffs_slow_path: IntCounter,
1160 pub(crate) writer_added: IntCounter,
1161 pub(crate) writer_removed: IntCounter,
1162 pub(crate) force_apply_hostname: IntCounter,
1163 pub(crate) rollup_write_success: IntCounter,
1164 pub(crate) rollup_write_noop_latest: IntCounter,
1165 pub(crate) rollup_write_noop_truncated: IntCounter,
1166}
1167
1168impl StateMetrics {
1169 pub(crate) fn new(registry: &MetricsRegistry) -> Self {
1170 let rollup_write_noop: IntCounterVec = registry.register(metric!(
1171 name: "mz_persist_state_rollup_write_noop",
1172 help: "count of no-op rollup writes",
1173 var_labels: ["reason"],
1174 ));
1175
1176 StateMetrics {
1177 apply_spine_fast_path: registry.register(metric!(
1178 name: "mz_persist_state_apply_spine_fast_path",
1179 help: "count of spine diff applications that hit the fast path",
1180 )),
1181 apply_spine_slow_path: registry.register(metric!(
1182 name: "mz_persist_state_apply_spine_slow_path",
1183 help: "count of spine diff applications that hit the slow path",
1184 )),
1185 apply_spine_slow_path_lenient: registry.register(metric!(
1186 name: "mz_persist_state_apply_spine_slow_path_lenient",
1187 help: "count of spine diff applications that hit the lenient compaction apply path",
1188 )),
1189 apply_spine_slow_path_lenient_adjustment: registry.register(metric!(
1190 name: "mz_persist_state_apply_spine_slow_path_lenient_adjustment",
1191 help: "count of adjustments made by the lenient compaction apply path",
1192 )),
1193 apply_spine_slow_path_with_reconstruction: registry.register(metric!(
1194 name: "mz_persist_state_apply_spine_slow_path_with_reconstruction",
1195 help: "count of spine diff applications that hit the slow path with extra spine reconstruction step",
1196 )),
1197 apply_spine_flattened: registry.register(metric!(
1198 name: "mz_persist_state_apply_spine_flattened",
1199 help: "count of spine diff applications that flatten the trace",
1200 )),
1201 update_state_noop_path: registry.register(metric!(
1202 name: "mz_persist_state_update_state_noop_path",
1203 help: "count of state update applications that no-oped due to shared state",
1204 )),
1205 update_state_empty_path: registry.register(metric!(
1206 name: "mz_persist_state_update_state_empty_path",
1207 help: "count of state update applications that found no new updates",
1208 )),
1209 update_state_fast_path: registry.register(metric!(
1210 name: "mz_persist_state_update_state_fast_path",
1211 help: "count of state update applications that hit the fast path",
1212 )),
1213 update_state_slow_path: registry.register(metric!(
1214 name: "mz_persist_state_update_state_slow_path",
1215 help: "count of state update applications that hit the slow path",
1216 )),
1217 rollup_at_seqno_migration: registry.register(metric!(
1218 name: "mz_persist_state_rollup_at_seqno_migration",
1219 help: "count of fetch_rollup_at_seqno calls that only worked because of the migration",
1220 )),
1221 fetch_recent_live_diffs_fast_path: registry.register(metric!(
1222 name: "mz_persist_state_fetch_recent_live_diffs_fast_path",
1223 help: "count of fetch_recent_live_diffs that hit the fast path",
1224 )),
1225 fetch_recent_live_diffs_slow_path: registry.register(metric!(
1226 name: "mz_persist_state_fetch_recent_live_diffs_slow_path",
1227 help: "count of fetch_recent_live_diffs that hit the slow path",
1228 )),
1229 writer_added: registry.register(metric!(
1230 name: "mz_persist_state_writer_added",
1231 help: "count of writers added to the state",
1232 )),
1233 writer_removed: registry.register(metric!(
1234 name: "mz_persist_state_writer_removed",
1235 help: "count of writers removed from the state",
1236 )),
1237 force_apply_hostname: registry.register(metric!(
1238 name: "mz_persist_state_force_applied_hostname",
1239 help: "count of when hostname diffs needed to be force applied",
1240 )),
1241 rollup_write_success: registry.register(metric!(
1242 name: "mz_persist_state_rollup_write_success",
1243 help: "count of rollups written successful (may not be linked in to state)",
1244 )),
1245 rollup_write_noop_latest: rollup_write_noop.with_label_values(&["latest"]),
1246 rollup_write_noop_truncated: rollup_write_noop.with_label_values(&["truncated"]),
1247 }
1248 }
1249}
1250
1251#[derive(Debug)]
1252pub struct ShardsMetrics {
1253 _count: ComputedIntGauge,
1257 since: mz_ore::metrics::IntGaugeVec,
1258 upper: mz_ore::metrics::IntGaugeVec,
1259 encoded_rollup_size: mz_ore::metrics::UIntGaugeVec,
1260 encoded_diff_size: mz_ore::metrics::IntCounterVec,
1261 hollow_batch_count: mz_ore::metrics::UIntGaugeVec,
1262 spine_batch_count: mz_ore::metrics::UIntGaugeVec,
1263 batch_part_count: mz_ore::metrics::UIntGaugeVec,
1264 batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
1265 batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
1266 update_count: mz_ore::metrics::UIntGaugeVec,
1267 rollup_count: mz_ore::metrics::UIntGaugeVec,
1268 largest_batch_size: mz_ore::metrics::UIntGaugeVec,
1269 seqnos_held: mz_ore::metrics::UIntGaugeVec,
1270 seqnos_since_last_rollup: mz_ore::metrics::UIntGaugeVec,
1271 gc_seqno_held_parts: mz_ore::metrics::UIntGaugeVec,
1272 gc_live_diffs: mz_ore::metrics::UIntGaugeVec,
1273 gc_finished: mz_ore::metrics::IntCounterVec,
1274 compaction_applied: mz_ore::metrics::IntCounterVec,
1275 cmd_succeeded: mz_ore::metrics::IntCounterVec,
1276 usage_current_state_batches_bytes: mz_ore::metrics::UIntGaugeVec,
1277 usage_current_state_rollups_bytes: mz_ore::metrics::UIntGaugeVec,
1278 usage_referenced_not_current_state_bytes: mz_ore::metrics::UIntGaugeVec,
1279 usage_not_leaked_not_referenced_bytes: mz_ore::metrics::UIntGaugeVec,
1280 usage_leaked_bytes: mz_ore::metrics::UIntGaugeVec,
1281 pubsub_push_diff_applied: mz_ore::metrics::IntCounterVec,
1282 pubsub_push_diff_not_applied_stale: mz_ore::metrics::IntCounterVec,
1283 pubsub_push_diff_not_applied_out_of_order: mz_ore::metrics::IntCounterVec,
1284 stale_version: mz_ore::metrics::UIntGaugeVec,
1285 blob_gets: mz_ore::metrics::IntCounterVec,
1286 blob_sets: mz_ore::metrics::IntCounterVec,
1287 live_writers: mz_ore::metrics::UIntGaugeVec,
1288 unconsolidated_snapshot: mz_ore::metrics::IntCounterVec,
1289 backpressure_emitted_bytes: IntCounterVec,
1290 backpressure_last_backpressured_bytes: UIntGaugeVec,
1291 backpressure_retired_bytes: IntCounterVec,
1292 rewrite_part_count: UIntGaugeVec,
1293 inline_part_count: UIntGaugeVec,
1294 inline_part_bytes: UIntGaugeVec,
1295 compact_batches: UIntGaugeVec,
1296 compacting_batches: UIntGaugeVec,
1297 noncompact_batches: UIntGaugeVec,
1298 schema_registry_version_count: UIntGaugeVec,
1299 inline_backpressure_count: IntCounterVec,
1300 shards: Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
1304}
1305
1306impl ShardsMetrics {
1307 fn new(registry: &MetricsRegistry) -> Self {
1308 let shards = Arc::new(Mutex::new(BTreeMap::new()));
1309 let shards_count = Arc::clone(&shards);
1310 ShardsMetrics {
1311 _count: registry.register_computed_gauge(
1312 metric!(
1313 name: "mz_persist_shard_count",
1314 help: "count of all active shards on this process",
1315 ),
1316 move || {
1317 let mut ret = 0;
1318 Self::compute(&shards_count, |_m| ret += 1);
1319 ret
1320 },
1321 ),
1322 since: registry.register(metric!(
1323 name: "mz_persist_shard_since",
1324 help: "since by shard",
1325 var_labels: ["shard", "name"],
1326 )),
1327 upper: registry.register(metric!(
1328 name: "mz_persist_shard_upper",
1329 help: "upper by shard",
1330 var_labels: ["shard", "name"],
1331 )),
1332 encoded_rollup_size: registry.register(metric!(
1333 name: "mz_persist_shard_rollup_size_bytes",
1334 help: "total encoded rollup size by shard",
1335 var_labels: ["shard", "name"],
1336 )),
1337 encoded_diff_size: registry.register(metric!(
1338 name: "mz_persist_shard_diff_size_bytes",
1339 help: "total encoded diff size by shard",
1340 var_labels: ["shard", "name"],
1341 )),
1342 hollow_batch_count: registry.register(metric!(
1343 name: "mz_persist_shard_hollow_batch_count",
1344 help: "count of hollow batches by shard",
1345 var_labels: ["shard", "name"],
1346 )),
1347 spine_batch_count: registry.register(metric!(
1348 name: "mz_persist_shard_spine_batch_count",
1349 help: "count of spine batches by shard",
1350 var_labels: ["shard", "name"],
1351 )),
1352 batch_part_count: registry.register(metric!(
1353 name: "mz_persist_shard_batch_part_count",
1354 help: "count of batch parts by shard",
1355 var_labels: ["shard", "name"],
1356 )),
1357 batch_part_version_count: registry.register(metric!(
1358 name: "mz_persist_shard_batch_part_version_count",
1359 help: "count of batch parts by shard and version",
1360 var_labels: ["shard", "name", "version"],
1361 )),
1362 batch_part_version_bytes: registry.register(metric!(
1363 name: "mz_persist_shard_batch_part_version_bytes",
1364 help: "total bytes in batch parts by shard and version",
1365 var_labels: ["shard", "name", "version"],
1366 )),
1367 update_count: registry.register(metric!(
1368 name: "mz_persist_shard_update_count",
1369 help: "count of updates by shard",
1370 var_labels: ["shard", "name"],
1371 )),
1372 rollup_count: registry.register(metric!(
1373 name: "mz_persist_shard_rollup_count",
1374 help: "count of rollups by shard",
1375 var_labels: ["shard", "name"],
1376 )),
1377 largest_batch_size: registry.register(metric!(
1378 name: "mz_persist_shard_largest_batch_size",
1379 help: "largest encoded batch size by shard",
1380 var_labels: ["shard", "name"],
1381 )),
1382 seqnos_held: registry.register(metric!(
1383 name: "mz_persist_shard_seqnos_held",
1384 help: "maximum count of gc-ineligible states by shard",
1385 var_labels: ["shard", "name"],
1386 )),
1387 seqnos_since_last_rollup: registry.register(metric!(
1388 name: "mz_persist_shard_seqnos_since_last_rollup",
1389 help: "count of seqnos since last rollup",
1390 var_labels: ["shard", "name"],
1391 )),
1392 gc_seqno_held_parts: registry.register(metric!(
1393 name: "mz_persist_shard_gc_seqno_held_parts",
1394 help: "count of parts referenced by some live state but not the current state (ie. parts kept only to satisfy seqno holds) at GC time",
1395 var_labels: ["shard", "name"],
1396 )),
1397 gc_live_diffs: registry.register(metric!(
1398 name: "mz_persist_shard_gc_live_diffs",
1399 help: "the number of diffs (or, alternatively, the number of seqnos) present in consensus state at GC time",
1400 var_labels: ["shard", "name"],
1401 )),
1402 gc_finished: registry.register(metric!(
1403 name: "mz_persist_shard_gc_finished",
1404 help: "count of garbage collections finished by shard",
1405 var_labels: ["shard", "name"],
1406 )),
1407 compaction_applied: registry.register(metric!(
1408 name: "mz_persist_shard_compaction_applied",
1409 help: "count of compactions applied to state by shard",
1410 var_labels: ["shard", "name"],
1411 )),
1412 cmd_succeeded: registry.register(metric!(
1413 name: "mz_persist_shard_cmd_succeeded",
1414 help: "count of commands succeeded by shard",
1415 var_labels: ["shard", "name"],
1416 )),
1417 usage_current_state_batches_bytes: registry.register(metric!(
1418 name: "mz_persist_shard_usage_current_state_batches_bytes",
1419 help: "data in batches/parts referenced by current version of state",
1420 var_labels: ["shard", "name"],
1421 )),
1422 usage_current_state_rollups_bytes: registry.register(metric!(
1423 name: "mz_persist_shard_usage_current_state_rollups_bytes",
1424 help: "data in rollups referenced by current version of state",
1425 var_labels: ["shard", "name"],
1426 )),
1427 usage_referenced_not_current_state_bytes: registry.register(metric!(
1428 name: "mz_persist_shard_usage_referenced_not_current_state_bytes",
1429 help: "data referenced only by a previous version of state",
1430 var_labels: ["shard", "name"],
1431 )),
1432 usage_not_leaked_not_referenced_bytes: registry.register(metric!(
1433 name: "mz_persist_shard_usage_not_leaked_not_referenced_bytes",
1434 help: "data written by an active writer but not referenced by any version of state",
1435 var_labels: ["shard", "name"],
1436 )),
1437 usage_leaked_bytes: registry.register(metric!(
1438 name: "mz_persist_shard_usage_leaked_bytes",
1439 help: "data reclaimable by a leaked blob detector",
1440 var_labels: ["shard", "name"],
1441 )),
1442 pubsub_push_diff_applied: registry.register(metric!(
1443 name: "mz_persist_shard_pubsub_diff_applied",
1444 help: "number of diffs received via pubsub that applied",
1445 var_labels: ["shard", "name"],
1446 )),
1447 pubsub_push_diff_not_applied_stale: registry.register(metric!(
1448 name: "mz_persist_shard_pubsub_diff_not_applied_stale",
1449 help: "number of diffs received via pubsub that did not apply due to staleness",
1450 var_labels: ["shard", "name"],
1451 )),
1452 pubsub_push_diff_not_applied_out_of_order: registry.register(metric!(
1453 name: "mz_persist_shard_pubsub_diff_not_applied_out_of_order",
1454 help: "number of diffs received via pubsub that did not apply due to out-of-order delivery",
1455 var_labels: ["shard", "name"],
1456 )),
1457 stale_version: registry.register(metric!(
1458 name: "mz_persist_shard_stale_version",
1459 help: "indicates whether the current version of the shard is less than the current version of the code",
1460 var_labels: ["shard", "name"],
1461 )),
1462 blob_gets: registry.register(metric!(
1463 name: "mz_persist_shard_blob_gets",
1464 help: "number of Blob::get calls for this shard",
1465 var_labels: ["shard", "name"],
1466 )),
1467 blob_sets: registry.register(metric!(
1468 name: "mz_persist_shard_blob_sets",
1469 help: "number of Blob::set calls for this shard",
1470 var_labels: ["shard", "name"],
1471 )),
1472 live_writers: registry.register(metric!(
1473 name: "mz_persist_shard_live_writers",
1474 help: "number of writers that have recently appended updates to this shard",
1475 var_labels: ["shard", "name"],
1476 )),
1477 unconsolidated_snapshot: registry.register(metric!(
1478 name: "mz_persist_shard_unconsolidated_snapshot",
1479 help: "in snapshot_and_read, the number of times consolidating the raw data wasn't enough to produce consolidated output",
1480 var_labels: ["shard", "name"],
1481 )),
1482 backpressure_emitted_bytes: registry.register(metric!(
1483 name: "mz_persist_backpressure_emitted_bytes",
1484 help: "A counter with the number of emitted bytes.",
1485 var_labels: ["shard", "name"],
1486 )),
1487 backpressure_last_backpressured_bytes: registry.register(metric!(
1488 name: "mz_persist_backpressure_last_backpressured_bytes",
1489 help: "The last count of bytes we are waiting to be retired in \
1490 the operator. This cannot be directly compared to \
1491 `retired_bytes`, but CAN indicate that backpressure is happening.",
1492 var_labels: ["shard", "name"],
1493 )),
1494 backpressure_retired_bytes: registry.register(metric!(
1495 name: "mz_persist_backpressure_retired_bytes",
1496 help:"A counter with the number of bytes retired by downstream processing.",
1497 var_labels: ["shard", "name"],
1498 )),
1499 rewrite_part_count: registry.register(metric!(
1500 name: "mz_persist_shard_rewrite_part_count",
1501 help: "count of batch parts with rewrites by shard",
1502 var_labels: ["shard", "name"],
1503 )),
1504 inline_part_count: registry.register(metric!(
1505 name: "mz_persist_shard_inline_part_count",
1506 help: "count of parts inline in shard metadata",
1507 var_labels: ["shard", "name"],
1508 )),
1509 inline_part_bytes: registry.register(metric!(
1510 name: "mz_persist_shard_inline_part_bytes",
1511 help: "total size of parts inline in shard metadata",
1512 var_labels: ["shard", "name"],
1513 )),
1514 compact_batches: registry.register(metric!(
1515 name: "mz_persist_shard_compact_batches",
1516 help: "number of fully compact batches in the shard",
1517 var_labels: ["shard", "name"],
1518 )),
1519 compacting_batches: registry.register(metric!(
1520 name: "mz_persist_shard_compacting_batches",
1521 help: "number of batches in the shard with compactions in progress",
1522 var_labels: ["shard", "name"],
1523 )),
1524 noncompact_batches: registry.register(metric!(
1525 name: "mz_persist_shard_noncompact_batches",
1526 help: "number of batches in the shard that aren't compact and have no ongoing compaction",
1527 var_labels: ["shard", "name"],
1528 )),
1529 schema_registry_version_count: registry.register(metric!(
1530 name: "mz_persist_shard_schema_registry_version_count",
1531 help: "count of versions in the schema registry",
1532 var_labels: ["shard", "name"],
1533 )),
1534 inline_backpressure_count: registry.register(metric!(
1535 name: "mz_persist_shard_inline_backpressure_count",
1536 help: "count of CaA attempts retried because of inline backpressure",
1537 var_labels: ["shard", "name"],
1538 )),
1539 shards,
1540 }
1541 }
1542
1543 pub fn shard(&self, shard_id: &ShardId, name: &str) -> Arc<ShardMetrics> {
1544 let mut shards = self.shards.lock().expect("mutex poisoned");
1545 if let Some(shard) = shards.get(shard_id) {
1546 if let Some(shard) = shard.upgrade() {
1547 return Arc::clone(&shard);
1548 } else {
1549 assert!(shards.remove(shard_id).is_some());
1550 }
1551 }
1552 let shard = Arc::new(ShardMetrics::new(shard_id, name, self));
1553 assert!(
1554 shards
1555 .insert(shard_id.clone(), Arc::downgrade(&shard))
1556 .is_none()
1557 );
1558 shard
1559 }
1560
1561 fn compute<F: FnMut(&ShardMetrics)>(
1562 shards: &Arc<Mutex<BTreeMap<ShardId, Weak<ShardMetrics>>>>,
1563 mut f: F,
1564 ) {
1565 let mut shards = shards.lock().expect("mutex poisoned");
1566 let mut deleted_shards = Vec::new();
1567 for (shard_id, metrics) in shards.iter() {
1568 if let Some(metrics) = metrics.upgrade() {
1569 f(&metrics);
1570 } else {
1571 deleted_shards.push(shard_id.clone());
1572 }
1573 }
1574 for deleted_shard_id in deleted_shards {
1575 assert!(shards.remove(&deleted_shard_id).is_some());
1576 }
1577 }
1578}
1579
1580#[derive(Debug)]
1581pub struct ShardMetrics {
1582 pub shard_id: ShardId,
1583 pub name: String,
1584 pub since: DeleteOnDropGauge<AtomicI64, Vec<String>>,
1585 pub upper: DeleteOnDropGauge<AtomicI64, Vec<String>>,
1586 pub largest_batch_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1587 pub latest_rollup_size: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1588 pub encoded_diff_size: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1589 pub hollow_batch_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1590 pub spine_batch_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1591 pub batch_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1592 batch_part_version_count: mz_ore::metrics::UIntGaugeVec,
1593 batch_part_version_bytes: mz_ore::metrics::UIntGaugeVec,
1594 batch_part_version_map: Mutex<BTreeMap<String, BatchPartVersionMetrics>>,
1595 pub update_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1596 pub rollup_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1597 pub seqnos_held: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1598 pub seqnos_since_last_rollup: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1599 pub gc_seqno_held_parts: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1600 pub gc_live_diffs: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1601 pub usage_current_state_batches_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1602 pub usage_current_state_rollups_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1603 pub usage_referenced_not_current_state_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1604 pub usage_not_leaked_not_referenced_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1605 pub usage_leaked_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1606 pub gc_finished: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1607 pub compaction_applied: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1608 pub cmd_succeeded: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1609 pub pubsub_push_diff_applied: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1610 pub pubsub_push_diff_not_applied_stale: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1611 pub pubsub_push_diff_not_applied_out_of_order: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1612 pub stale_version: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1613 pub blob_gets: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1614 pub blob_sets: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1615 pub live_writers: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1616 pub unconsolidated_snapshot: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1617 pub backpressure_emitted_bytes: Arc<DeleteOnDropCounter<AtomicU64, Vec<String>>>,
1618 pub backpressure_last_backpressured_bytes: Arc<DeleteOnDropGauge<AtomicU64, Vec<String>>>,
1619 pub backpressure_retired_bytes: Arc<DeleteOnDropCounter<AtomicU64, Vec<String>>>,
1620 pub rewrite_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1621 pub inline_part_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1622 pub inline_part_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1623 pub compact_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1624 pub compacting_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1625 pub noncompact_batches: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1626 pub schema_registry_version_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1627 pub inline_backpressure_count: DeleteOnDropCounter<AtomicU64, Vec<String>>,
1628}
1629
1630impl ShardMetrics {
1631 pub fn new(shard_id: &ShardId, name: &str, shards_metrics: &ShardsMetrics) -> Self {
1632 let shard = shard_id.to_string();
1633 ShardMetrics {
1634 shard_id: *shard_id,
1635 name: name.to_string(),
1636 since: shards_metrics
1637 .since
1638 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1639 upper: shards_metrics
1640 .upper
1641 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1642 latest_rollup_size: shards_metrics
1643 .encoded_rollup_size
1644 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1645 encoded_diff_size: shards_metrics
1646 .encoded_diff_size
1647 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1648 hollow_batch_count: shards_metrics
1649 .hollow_batch_count
1650 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1651 spine_batch_count: shards_metrics
1652 .spine_batch_count
1653 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1654 batch_part_count: shards_metrics
1655 .batch_part_count
1656 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1657 batch_part_version_count: shards_metrics.batch_part_version_count.clone(),
1658 batch_part_version_bytes: shards_metrics.batch_part_version_bytes.clone(),
1659 batch_part_version_map: Mutex::new(BTreeMap::new()),
1660 update_count: shards_metrics
1661 .update_count
1662 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1663 rollup_count: shards_metrics
1664 .rollup_count
1665 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1666 largest_batch_size: shards_metrics
1667 .largest_batch_size
1668 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1669 seqnos_held: shards_metrics
1670 .seqnos_held
1671 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1672 seqnos_since_last_rollup: shards_metrics
1673 .seqnos_since_last_rollup
1674 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1675 gc_seqno_held_parts: shards_metrics
1676 .gc_seqno_held_parts
1677 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1678 gc_live_diffs: shards_metrics
1679 .gc_live_diffs
1680 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1681 gc_finished: shards_metrics
1682 .gc_finished
1683 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1684 compaction_applied: shards_metrics
1685 .compaction_applied
1686 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1687 cmd_succeeded: shards_metrics
1688 .cmd_succeeded
1689 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1690 usage_current_state_batches_bytes: shards_metrics
1691 .usage_current_state_batches_bytes
1692 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1693 usage_current_state_rollups_bytes: shards_metrics
1694 .usage_current_state_rollups_bytes
1695 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1696 usage_referenced_not_current_state_bytes: shards_metrics
1697 .usage_referenced_not_current_state_bytes
1698 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1699 usage_not_leaked_not_referenced_bytes: shards_metrics
1700 .usage_not_leaked_not_referenced_bytes
1701 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1702 usage_leaked_bytes: shards_metrics
1703 .usage_leaked_bytes
1704 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1705 pubsub_push_diff_applied: shards_metrics
1706 .pubsub_push_diff_applied
1707 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1708 pubsub_push_diff_not_applied_stale: shards_metrics
1709 .pubsub_push_diff_not_applied_stale
1710 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1711 pubsub_push_diff_not_applied_out_of_order: shards_metrics
1712 .pubsub_push_diff_not_applied_out_of_order
1713 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1714 stale_version: shards_metrics
1715 .stale_version
1716 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1717 blob_gets: shards_metrics
1718 .blob_gets
1719 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1720 blob_sets: shards_metrics
1721 .blob_sets
1722 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1723 live_writers: shards_metrics
1724 .live_writers
1725 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1726 unconsolidated_snapshot: shards_metrics
1727 .unconsolidated_snapshot
1728 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1729 backpressure_emitted_bytes: Arc::new(
1730 shards_metrics
1731 .backpressure_emitted_bytes
1732 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1733 ),
1734 backpressure_last_backpressured_bytes: Arc::new(
1735 shards_metrics
1736 .backpressure_last_backpressured_bytes
1737 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1738 ),
1739 backpressure_retired_bytes: Arc::new(
1740 shards_metrics
1741 .backpressure_retired_bytes
1742 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1743 ),
1744 rewrite_part_count: shards_metrics
1745 .rewrite_part_count
1746 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1747 inline_part_count: shards_metrics
1748 .inline_part_count
1749 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1750 inline_part_bytes: shards_metrics
1751 .inline_part_bytes
1752 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1753 compact_batches: shards_metrics
1754 .compact_batches
1755 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1756 compacting_batches: shards_metrics
1757 .compacting_batches
1758 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1759 noncompact_batches: shards_metrics
1760 .noncompact_batches
1761 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1762 schema_registry_version_count: shards_metrics
1763 .schema_registry_version_count
1764 .get_delete_on_drop_metric(vec![shard.clone(), name.to_string()]),
1765 inline_backpressure_count: shards_metrics
1766 .inline_backpressure_count
1767 .get_delete_on_drop_metric(vec![shard, name.to_string()]),
1768 }
1769 }
1770
1771 pub fn set_since<T: Codec64>(&self, since: &Antichain<T>) {
1772 self.since.set(encode_ts_metric(since))
1773 }
1774
1775 pub fn set_upper<T: Codec64>(&self, upper: &Antichain<T>) {
1776 self.upper.set(encode_ts_metric(upper))
1777 }
1778
1779 pub(crate) fn set_batch_part_versions<'a>(
1780 &self,
1781 batch_parts_by_version: impl Iterator<Item = (&'a str, usize)>,
1782 ) {
1783 let mut map = self
1784 .batch_part_version_map
1785 .lock()
1786 .expect("mutex should not be poisoned");
1787 for x in map.values() {
1794 x.batch_part_version_count.set(0);
1795 x.batch_part_version_bytes.set(0);
1796 }
1797
1798 for (key, bytes) in batch_parts_by_version {
1801 if !map.contains_key(key) {
1802 map.insert(
1803 key.to_owned(),
1804 BatchPartVersionMetrics {
1805 batch_part_version_count: self
1806 .batch_part_version_count
1807 .get_delete_on_drop_metric(vec![
1808 self.shard_id.to_string(),
1809 self.name.clone(),
1810 key.to_owned(),
1811 ]),
1812 batch_part_version_bytes: self
1813 .batch_part_version_bytes
1814 .get_delete_on_drop_metric(vec![
1815 self.shard_id.to_string(),
1816 self.name.clone(),
1817 key.to_owned(),
1818 ]),
1819 },
1820 );
1821 }
1822 let value = map.get(key).expect("inserted above");
1823 value.batch_part_version_count.inc();
1824 value.batch_part_version_bytes.add(u64::cast_from(bytes));
1825 }
1826 }
1827}
1828
1829#[derive(Debug)]
1830pub struct BatchPartVersionMetrics {
1831 pub batch_part_version_count: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1832 pub batch_part_version_bytes: DeleteOnDropGauge<AtomicU64, Vec<String>>,
1833}
1834
1835#[derive(Debug)]
1837pub struct UsageAuditMetrics {
1838 pub blob_batch_part_bytes: UIntGauge,
1840 pub blob_batch_part_count: UIntGauge,
1842 pub blob_rollup_bytes: UIntGauge,
1844 pub blob_rollup_count: UIntGauge,
1846 pub blob_bytes: UIntGauge,
1848 pub blob_count: UIntGauge,
1850 pub step_blob_metadata: Counter,
1852 pub step_state: Counter,
1854 pub step_math: Counter,
1856}
1857
1858impl UsageAuditMetrics {
1859 fn new(registry: &MetricsRegistry) -> Self {
1860 let step_timings: CounterVec = registry.register(metric!(
1861 name: "mz_persist_audit_step_seconds",
1862 help: "time spent on individual steps of audit",
1863 var_labels: ["step"],
1864 ));
1865 UsageAuditMetrics {
1866 blob_batch_part_bytes: registry.register(metric!(
1867 name: "mz_persist_audit_blob_batch_part_bytes",
1868 help: "total size of batch parts in blob",
1869 )),
1870 blob_batch_part_count: registry.register(metric!(
1871 name: "mz_persist_audit_blob_batch_part_count",
1872 help: "count of batch parts in blob",
1873 )),
1874 blob_rollup_bytes: registry.register(metric!(
1875 name: "mz_persist_audit_blob_rollup_bytes",
1876 help: "total size of state rollups stored in blob",
1877 )),
1878 blob_rollup_count: registry.register(metric!(
1879 name: "mz_persist_audit_blob_rollup_count",
1880 help: "count of all state rollups in blob",
1881 )),
1882 blob_bytes: registry.register(metric!(
1883 name: "mz_persist_audit_blob_bytes",
1884 help: "total size of blob",
1885 )),
1886 blob_count: registry.register(metric!(
1887 name: "mz_persist_audit_blob_count",
1888 help: "count of all blobs",
1889 )),
1890 step_blob_metadata: step_timings.with_label_values(&["blob_metadata"]),
1891 step_state: step_timings.with_label_values(&["state"]),
1892 step_math: step_timings.with_label_values(&["math"]),
1893 }
1894 }
1895}
1896
1897#[derive(Debug)]
1900pub enum UpdateDelta {
1901 Negative(u64),
1903 NonNegative(u64),
1905}
1906
1907impl UpdateDelta {
1908 pub fn new(new: usize, old: usize) -> Self {
1911 if new < old {
1912 UpdateDelta::Negative(CastFrom::cast_from(old - new))
1913 } else {
1914 UpdateDelta::NonNegative(CastFrom::cast_from(new - old))
1915 }
1916 }
1917}
1918
1919#[derive(Debug, Clone)]
1922pub struct SinkMetrics {
1923 correction_insertions_total: IntCounter,
1925 correction_deletions_total: IntCounter,
1927 correction_capacity_increases_total: IntCounter,
1929 correction_capacity_decreases_total: IntCounter,
1931 correction_max_per_sink_worker_len_updates: raw::UIntGaugeVec,
1933 correction_max_per_sink_worker_capacity_updates: raw::UIntGaugeVec,
1935}
1936
1937impl SinkMetrics {
1938 fn new(registry: &MetricsRegistry) -> Self {
1939 SinkMetrics {
1940 correction_insertions_total: registry.register(metric!(
1941 name: "mz_persist_sink_correction_insertions_total",
1942 help: "The cumulative insertions observed on the correction buffer across workers and persist sinks.",
1943 )),
1944 correction_deletions_total: registry.register(metric!(
1945 name: "mz_persist_sink_correction_deletions_total",
1946 help: "The cumulative deletions observed on the correction buffer across workers and persist sinks.",
1947 )),
1948 correction_capacity_increases_total: registry.register(metric!(
1949 name: "mz_persist_sink_correction_capacity_increases_total",
1950 help: "The cumulative capacity increases observed on the correction buffer across workers and persist sinks.",
1951 )),
1952 correction_capacity_decreases_total: registry.register(metric!(
1953 name: "mz_persist_sink_correction_capacity_decreases_total",
1954 help: "The cumulative capacity decreases observed on the correction buffer across workers and persist sinks.",
1955 )),
1956 correction_max_per_sink_worker_len_updates: registry.register(metric!(
1957 name: "mz_persist_sink_correction_max_per_sink_worker_len_updates",
1958 help: "The maximum length observed for the correction buffer of any single persist sink per worker.",
1959 var_labels: ["worker_id"],
1960 )),
1961 correction_max_per_sink_worker_capacity_updates: registry.register(metric!(
1962 name: "mz_persist_sink_correction_max_per_sink_worker_capacity_updates",
1963 help: "The maximum capacity observed for the correction buffer of any single persist sink per worker.",
1964 var_labels: ["worker_id"],
1965 )),
1966 }
1967 }
1968
1969 pub fn for_worker(&self, worker_id: usize) -> SinkWorkerMetrics {
1974 let worker = worker_id.to_string();
1975 let correction_max_per_sink_worker_len_updates = self
1976 .correction_max_per_sink_worker_len_updates
1977 .with_label_values(&[&worker]);
1978 let correction_max_per_sink_worker_capacity_updates = self
1979 .correction_max_per_sink_worker_capacity_updates
1980 .with_label_values(&[&worker]);
1981 SinkWorkerMetrics {
1982 correction_max_per_sink_worker_len_updates,
1983 correction_max_per_sink_worker_capacity_updates,
1984 }
1985 }
1986
1987 pub fn report_correction_update_deltas(
1993 &self,
1994 correction_len_delta: UpdateDelta,
1995 correction_cap_delta: UpdateDelta,
1996 ) {
1997 match correction_len_delta {
1999 UpdateDelta::NonNegative(delta) => {
2000 if delta > 0 {
2001 self.correction_insertions_total.inc_by(delta)
2002 }
2003 }
2004 UpdateDelta::Negative(delta) => self.correction_deletions_total.inc_by(delta),
2005 }
2006 match correction_cap_delta {
2008 UpdateDelta::NonNegative(delta) => {
2009 if delta > 0 {
2010 self.correction_capacity_increases_total.inc_by(delta)
2011 }
2012 }
2013 UpdateDelta::Negative(delta) => self.correction_capacity_decreases_total.inc_by(delta),
2014 }
2015 }
2016}
2017
2018#[derive(Clone, Debug)]
2020pub struct SinkWorkerMetrics {
2021 correction_max_per_sink_worker_len_updates: UIntGauge,
2022 correction_max_per_sink_worker_capacity_updates: UIntGauge,
2023}
2024
2025impl SinkWorkerMetrics {
2026 pub fn report_correction_update_totals(&self, correction_len: usize, correction_cap: usize) {
2031 let correction_len = CastFrom::cast_from(correction_len);
2033 if correction_len > self.correction_max_per_sink_worker_len_updates.get() {
2034 self.correction_max_per_sink_worker_len_updates
2035 .set(correction_len);
2036 }
2037 let correction_cap = CastFrom::cast_from(correction_cap);
2038 if correction_cap > self.correction_max_per_sink_worker_capacity_updates.get() {
2039 self.correction_max_per_sink_worker_capacity_updates
2040 .set(correction_cap);
2041 }
2042 }
2043}
2044
2045#[derive(Debug)]
2047pub struct AlertsMetrics {
2048 pub(crate) blob_failures: IntCounter,
2049 pub(crate) consensus_failures: IntCounter,
2050}
2051
2052impl AlertsMetrics {
2053 fn new(registry: &MetricsRegistry) -> Self {
2054 AlertsMetrics {
2055 blob_failures: registry.register(metric!(
2056 name: "mz_persist_blob_failures",
2057 help: "count of all blob operation failures",
2058 const_labels: {"honeycomb" => "import"},
2059 )),
2060 consensus_failures: registry.register(metric!(
2061 name: "mz_persist_consensus_failures",
2062 help: "count of determinate consensus operation failures",
2063 const_labels: {"honeycomb" => "import"},
2064 )),
2065 }
2066 }
2067}
2068
2069#[derive(Debug)]
2071pub struct PubSubServerMetrics {
2072 pub(crate) active_connections: UIntGauge,
2073 pub(crate) broadcasted_diff_count: IntCounter,
2074 pub(crate) broadcasted_diff_bytes: IntCounter,
2075 pub(crate) broadcasted_diff_dropped_channel_full: IntCounter,
2076
2077 pub(crate) push_seconds: Counter,
2078 pub(crate) subscribe_seconds: Counter,
2079 pub(crate) unsubscribe_seconds: Counter,
2080 pub(crate) connection_cleanup_seconds: Counter,
2081
2082 pub(crate) push_call_count: IntCounter,
2083 pub(crate) subscribe_call_count: IntCounter,
2084 pub(crate) unsubscribe_call_count: IntCounter,
2085}
2086
2087impl PubSubServerMetrics {
2088 pub(crate) fn new(registry: &MetricsRegistry) -> Self {
2089 let op_timings: CounterVec = registry.register(metric!(
2090 name: "mz_persist_pubsub_server_operation_seconds",
2091 help: "time spent in pubsub server performing each operation",
2092 var_labels: ["op"],
2093 ));
2094 let call_count: IntCounterVec = registry.register(metric!(
2095 name: "mz_persist_pubsub_server_call_count",
2096 help: "count of each pubsub server message received",
2097 var_labels: ["call"],
2098 ));
2099
2100 Self {
2101 active_connections: registry.register(metric!(
2102 name: "mz_persist_pubsub_server_active_connections",
2103 help: "number of active connections to server",
2104 )),
2105 broadcasted_diff_count: registry.register(metric!(
2106 name: "mz_persist_pubsub_server_broadcasted_diff_count",
2107 help: "count of total broadcast diff messages sent",
2108 )),
2109 broadcasted_diff_bytes: registry.register(metric!(
2110 name: "mz_persist_pubsub_server_broadcasted_diff_bytes",
2111 help: "count of total broadcast diff bytes sent",
2112 )),
2113 broadcasted_diff_dropped_channel_full: registry.register(metric!(
2114 name: "mz_persist_pubsub_server_broadcasted_diff_dropped_channel_full",
2115 help: "count of diffs dropped due to full connection channel",
2116 )),
2117
2118 push_seconds: op_timings.with_label_values(&["push"]),
2119 subscribe_seconds: op_timings.with_label_values(&["subscribe"]),
2120 unsubscribe_seconds: op_timings.with_label_values(&["unsubscribe"]),
2121 connection_cleanup_seconds: op_timings.with_label_values(&["cleanup"]),
2122
2123 push_call_count: call_count.with_label_values(&["push"]),
2124 subscribe_call_count: call_count.with_label_values(&["subscribe"]),
2125 unsubscribe_call_count: call_count.with_label_values(&["unsubscribe"]),
2126 }
2127 }
2128}
2129
2130#[derive(Debug)]
2132pub struct PubSubClientMetrics {
2133 pub sender: PubSubClientSenderMetrics,
2134 pub receiver: PubSubClientReceiverMetrics,
2135 pub grpc_connection: PubSubGrpcClientConnectionMetrics,
2136}
2137
2138impl PubSubClientMetrics {
2139 fn new(registry: &MetricsRegistry) -> Self {
2140 PubSubClientMetrics {
2141 sender: PubSubClientSenderMetrics::new(registry),
2142 receiver: PubSubClientReceiverMetrics::new(registry),
2143 grpc_connection: PubSubGrpcClientConnectionMetrics::new(registry),
2144 }
2145 }
2146}
2147
2148#[derive(Debug)]
2149pub struct PubSubGrpcClientConnectionMetrics {
2150 pub(crate) connected: UIntGauge,
2151 pub(crate) connection_established_count: IntCounter,
2152 pub(crate) connect_call_attempt_count: IntCounter,
2153 pub(crate) broadcast_recv_lagged_count: IntCounter,
2154 pub(crate) grpc_error_count: IntCounter,
2155}
2156
2157impl PubSubGrpcClientConnectionMetrics {
2158 fn new(registry: &MetricsRegistry) -> Self {
2159 Self {
2160 connected: registry.register(metric!(
2161 name: "mz_persist_pubsub_client_grpc_connected",
2162 help: "whether the grpc client is currently connected",
2163 )),
2164 connection_established_count: registry.register(metric!(
2165 name: "mz_persist_pubsub_client_grpc_connection_established_count",
2166 help: "count of grpc connection establishments to pubsub server",
2167 )),
2168 connect_call_attempt_count: registry.register(metric!(
2169 name: "mz_persist_pubsub_client_grpc_connect_call_attempt_count",
2170 help: "count of connection call attempts (including retries) to pubsub server",
2171 )),
2172 broadcast_recv_lagged_count: registry.register(metric!(
2173 name: "mz_persist_pubsub_client_grpc_broadcast_recv_lagged_count",
2174 help: "times a message was missed by broadcast receiver due to lag",
2175 )),
2176 grpc_error_count: registry.register(metric!(
2177 name: "mz_persist_pubsub_client_grpc_error_count",
2178 help: "count of grpc errors received",
2179 )),
2180 }
2181 }
2182}
2183
2184#[derive(Clone, Debug)]
2185pub struct PubSubClientReceiverMetrics {
2186 pub(crate) push_received: IntCounter,
2187 pub(crate) unknown_message_received: IntCounter,
2188 pub(crate) approx_diff_latency_seconds: Histogram,
2189
2190 pub(crate) state_pushed_diff_fast_path: IntCounter,
2191 pub(crate) state_pushed_diff_slow_path_succeeded: IntCounter,
2192 pub(crate) state_pushed_diff_slow_path_failed: IntCounter,
2193}
2194
2195impl PubSubClientReceiverMetrics {
2196 fn new(registry: &MetricsRegistry) -> Self {
2197 let call_received: IntCounterVec = registry.register(metric!(
2198 name: "mz_persist_pubsub_client_call_received",
2199 help: "times a pubsub client call was received",
2200 var_labels: ["call"],
2201 ));
2202
2203 Self {
2204 push_received: call_received.with_label_values(&["push"]),
2205 unknown_message_received: call_received.with_label_values(&["unknown"]),
2206 approx_diff_latency_seconds: registry.register(metric!(
2207 name: "mz_persist_pubsub_client_approx_diff_apply_latency_seconds",
2208 help: "histogram of (approximate) latency between sending a diff and applying it",
2209 buckets: prometheus::exponential_buckets(0.001, 2.0, 13).expect("buckets"),
2210 )),
2211
2212 state_pushed_diff_fast_path: registry.register(metric!(
2213 name: "mz_persist_pubsub_client_receiver_state_push_diff_fast_path",
2214 help: "count fast-path state push_diff calls",
2215 )),
2216 state_pushed_diff_slow_path_succeeded: registry.register(metric!(
2217 name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_succeeded",
2218 help: "count of successful slow-path state push_diff calls",
2219 )),
2220 state_pushed_diff_slow_path_failed: registry.register(metric!(
2221 name: "mz_persist_pubsub_client_receiver_state_push_diff_slow_path_failed",
2222 help: "count of unsuccessful slow-path state push_diff calls",
2223 )),
2224 }
2225 }
2226}
2227
2228#[derive(Debug)]
2229pub struct PubSubClientSenderMetrics {
2230 pub push: PubSubClientCallMetrics,
2231 pub subscribe: PubSubClientCallMetrics,
2232 pub unsubscribe: PubSubClientCallMetrics,
2233}
2234
2235#[derive(Debug)]
2236pub struct PubSubClientCallMetrics {
2237 pub(crate) succeeded: IntCounter,
2238 pub(crate) bytes_sent: IntCounter,
2239 pub(crate) failed: IntCounter,
2240}
2241
2242impl PubSubClientSenderMetrics {
2243 fn new(registry: &MetricsRegistry) -> Self {
2244 let call_bytes_sent: IntCounterVec = registry.register(metric!(
2245 name: "mz_persist_pubsub_client_call_bytes_sent",
2246 help: "number of bytes sent for a given pubsub client call",
2247 var_labels: ["call"],
2248 ));
2249 let call_succeeded: IntCounterVec = registry.register(metric!(
2250 name: "mz_persist_pubsub_client_call_succeeded",
2251 help: "times a pubsub client call succeeded",
2252 var_labels: ["call"],
2253 ));
2254 let call_failed: IntCounterVec = registry.register(metric!(
2255 name: "mz_persist_pubsub_client_call_failed",
2256 help: "times a pubsub client call failed",
2257 var_labels: ["call"],
2258 ));
2259
2260 Self {
2261 push: PubSubClientCallMetrics {
2262 succeeded: call_succeeded.with_label_values(&["push"]),
2263 failed: call_failed.with_label_values(&["push"]),
2264 bytes_sent: call_bytes_sent.with_label_values(&["push"]),
2265 },
2266 subscribe: PubSubClientCallMetrics {
2267 succeeded: call_succeeded.with_label_values(&["subscribe"]),
2268 failed: call_failed.with_label_values(&["subscribe"]),
2269 bytes_sent: call_bytes_sent.with_label_values(&["subscribe"]),
2270 },
2271 unsubscribe: PubSubClientCallMetrics {
2272 succeeded: call_succeeded.with_label_values(&["unsubscribe"]),
2273 failed: call_failed.with_label_values(&["unsubscribe"]),
2274 bytes_sent: call_bytes_sent.with_label_values(&["unsubscribe"]),
2275 },
2276 }
2277 }
2278}
2279
2280#[derive(Debug)]
2281pub struct LocksMetrics {
2282 pub(crate) applier_read_cacheable: LockMetrics,
2283 pub(crate) applier_read_noncacheable: LockMetrics,
2284 pub(crate) applier_write: LockMetrics,
2285 pub(crate) watch: LockMetrics,
2286}
2287
2288#[derive(Debug, Clone)]
2289pub struct LockMetrics {
2290 pub(crate) acquire_count: IntCounter,
2291 pub(crate) blocking_acquire_count: IntCounter,
2292 pub(crate) blocking_seconds: Counter,
2293}
2294
2295#[derive(Debug)]
2296pub struct WatchMetrics {
2297 pub(crate) wait_woken_via_watch: IntCounter,
2298 pub(crate) wait_woken_via_sleep: IntCounter,
2299 pub(crate) wait_resolved_via_watch: IntCounter,
2300 pub(crate) wait_resolved_via_sleep: IntCounter,
2301 pub(crate) notify_sent: IntCounter,
2302 pub(crate) notify_noop: IntCounter,
2303 pub(crate) notify_recv: IntCounter,
2304 pub(crate) notify_lagged: IntCounter,
2305 pub(crate) notify_wait_started: IntCounter,
2306 pub(crate) notify_wait_finished: IntCounter,
2307}
2308
2309impl WatchMetrics {
2310 fn new(registry: &MetricsRegistry) -> Self {
2311 WatchMetrics {
2312 wait_woken_via_watch: registry.register(metric!(
2313 name: "mz_persist_wait_woken_via_watch",
2314 help: "count of wait-for-uppers wakes via watch notify",
2315 )),
2316 wait_woken_via_sleep: registry.register(metric!(
2317 name: "mz_persist_wait_woken_via_sleep",
2318 help: "count of wait-for-uppers wakes via sleep",
2319 )),
2320 wait_resolved_via_watch: registry.register(metric!(
2321 name: "mz_persist_wait_resolved_via_watch",
2322 help: "count of wait-for-uppers resolved via watch notify",
2323 )),
2324 wait_resolved_via_sleep: registry.register(metric!(
2325 name: "mz_persist_wait_resolved_via_sleep",
2326 help: "count of wait-for-uppers resolved via sleep",
2327 )),
2328 notify_sent: registry.register(metric!(
2329 name: "mz_persist_watch_notify_sent",
2330 help: "count of watch notifications sent to a non-empty broadcast channel",
2331 )),
2332 notify_noop: registry.register(metric!(
2333 name: "mz_persist_watch_notify_noop",
2334 help: "count of watch notifications sent to an broadcast channel",
2335 )),
2336 notify_recv: registry.register(metric!(
2337 name: "mz_persist_watch_notify_recv",
2338 help: "count of watch notifications received from the broadcast channel",
2339 )),
2340 notify_lagged: registry.register(metric!(
2341 name: "mz_persist_watch_notify_lagged",
2342 help: "count of lagged events in the watch notification broadcast channel",
2343 )),
2344 notify_wait_started: registry.register(metric!(
2345 name: "mz_persist_watch_notify_wait_started",
2346 help: "count of watch wait calls started",
2347 )),
2348 notify_wait_finished: registry.register(metric!(
2349 name: "mz_persist_watch_notify_wait_finished",
2350 help: "count of watch wait calls resolved",
2351 )),
2352 }
2353 }
2354}
2355
2356#[derive(Debug)]
2357pub struct PushdownMetrics {
2358 pub(crate) parts_filtered_count: IntCounter,
2359 pub(crate) parts_filtered_bytes: IntCounter,
2360 pub(crate) parts_fetched_count: IntCounter,
2361 pub(crate) parts_fetched_bytes: IntCounter,
2362 pub(crate) parts_audited_count: IntCounter,
2363 pub(crate) parts_audited_bytes: IntCounter,
2364 pub(crate) parts_inline_count: IntCounter,
2365 pub(crate) parts_inline_bytes: IntCounter,
2366 pub(crate) parts_faked_count: IntCounter,
2367 pub(crate) parts_faked_bytes: IntCounter,
2368 pub(crate) parts_stats_trimmed_count: IntCounter,
2369 pub(crate) parts_stats_trimmed_bytes: IntCounter,
2370 pub(crate) parts_projection_trimmed_bytes: IntCounter,
2371 pub part_stats: PartStatsMetrics,
2372}
2373
2374impl PushdownMetrics {
2375 fn new(registry: &MetricsRegistry) -> Self {
2376 PushdownMetrics {
2377 parts_filtered_count: registry.register(metric!(
2378 name: "mz_persist_pushdown_parts_filtered_count",
2379 help: "count of parts filtered by pushdown",
2380 )),
2381 parts_filtered_bytes: registry.register(metric!(
2382 name: "mz_persist_pushdown_parts_filtered_bytes",
2383 help: "total size of parts filtered by pushdown in bytes",
2384 )),
2385 parts_fetched_count: registry.register(metric!(
2386 name: "mz_persist_pushdown_parts_fetched_count",
2387 help: "count of parts not filtered by pushdown",
2388 )),
2389 parts_fetched_bytes: registry.register(metric!(
2390 name: "mz_persist_pushdown_parts_fetched_bytes",
2391 help: "total size of parts not filtered by pushdown in bytes",
2392 )),
2393 parts_audited_count: registry.register(metric!(
2394 name: "mz_persist_pushdown_parts_audited_count",
2395 help: "count of parts fetched only for pushdown audit",
2396 )),
2397 parts_audited_bytes: registry.register(metric!(
2398 name: "mz_persist_pushdown_parts_audited_bytes",
2399 help: "total size of parts fetched only for pushdown audit",
2400 )),
2401 parts_inline_count: registry.register(metric!(
2402 name: "mz_persist_pushdown_parts_inline_count",
2403 help: "count of parts not fetched because they were inline",
2404 )),
2405 parts_inline_bytes: registry.register(metric!(
2406 name: "mz_persist_pushdown_parts_inline_bytes",
2407 help: "total size of parts not fetched because they were inline",
2408 )),
2409 parts_faked_count: registry.register(metric!(
2410 name: "mz_persist_pushdown_parts_faked_count",
2411 help: "count of parts faked because of aggressive projection pushdown",
2412 )),
2413 parts_faked_bytes: registry.register(metric!(
2414 name: "mz_persist_pushdown_parts_faked_bytes",
2415 help: "total size of parts replaced with fakes by aggressive projection pushdown",
2416 )),
2417 parts_stats_trimmed_count: registry.register(metric!(
2418 name: "mz_persist_pushdown_parts_stats_trimmed_count",
2419 help: "count of trimmed part stats",
2420 )),
2421 parts_stats_trimmed_bytes: registry.register(metric!(
2422 name: "mz_persist_pushdown_parts_stats_trimmed_bytes",
2423 help: "total bytes trimmed from part stats",
2424 )),
2425 parts_projection_trimmed_bytes: registry.register(metric!(
2426 name: "mz_persist_pushdown_parts_projection_trimmed_bytes",
2427 help: "total bytes trimmed from columnar data because of projection pushdown",
2428 )),
2429 part_stats: PartStatsMetrics::new(registry),
2430 }
2431 }
2432}
2433
2434#[derive(Debug)]
2435pub struct ConsolidationMetrics {
2436 pub(crate) parts_fetched: IntCounter,
2437 pub(crate) parts_skipped: IntCounter,
2438 pub(crate) parts_wasted: IntCounter,
2439 pub(crate) wrong_sort: IntCounter,
2440}
2441
2442impl ConsolidationMetrics {
2443 fn new(registry: &MetricsRegistry) -> Self {
2444 ConsolidationMetrics {
2445 parts_fetched: registry.register(metric!(
2446 name: "mz_persist_consolidation_parts_fetched_count",
2447 help: "count of parts that were fetched and used during consolidation",
2448 )),
2449 parts_skipped: registry.register(metric!(
2450 name: "mz_persist_consolidation_parts_skipped_count",
2451 help: "count of parts that were never needed during consolidation",
2452 )),
2453 parts_wasted: registry.register(metric!(
2454 name: "mz_persist_consolidation_parts_wasted_count",
2455 help: "count of parts that were fetched but not needed during consolidation",
2456 )),
2457 wrong_sort: registry.register(metric!(
2458 name: "mz_persist_consolidation_wrong_sort_count",
2459 help: "count of runs that were sorted using the wrong ordering for the current consolidation",
2460 )),
2461 }
2462 }
2463}
2464
2465#[derive(Debug)]
2466pub struct BlobMemCache {
2467 pub(crate) size_blobs: UIntGauge,
2468 pub(crate) size_bytes: UIntGauge,
2469 pub(crate) hits_blobs: IntCounter,
2470 pub(crate) hits_bytes: IntCounter,
2471 pub(crate) evictions: IntCounter,
2472}
2473
2474impl BlobMemCache {
2475 fn new(registry: &MetricsRegistry) -> Self {
2476 BlobMemCache {
2477 size_blobs: registry.register(metric!(
2478 name: "mz_persist_blob_cache_size_blobs",
2479 help: "count of blobs in the cache",
2480 const_labels: {"cache" => "mem"},
2481 )),
2482 size_bytes: registry.register(metric!(
2483 name: "mz_persist_blob_cache_size_bytes",
2484 help: "total size of blobs in the cache",
2485 const_labels: {"cache" => "mem"},
2486 )),
2487 hits_blobs: registry.register(metric!(
2488 name: "mz_persist_blob_cache_hits_blobs",
2489 help: "count of blobs served via cache instead of s3",
2490 const_labels: {"cache" => "mem"},
2491 )),
2492 hits_bytes: registry.register(metric!(
2493 name: "mz_persist_blob_cache_hits_bytes",
2494 help: "total size of blobs served via cache instead of s3",
2495 const_labels: {"cache" => "mem"},
2496 )),
2497 evictions: registry.register(metric!(
2498 name: "mz_persist_blob_cache_evictions",
2499 help: "count of capacity-based cache evictions",
2500 const_labels: {"cache" => "mem"},
2501 )),
2502 }
2503 }
2504}
2505
2506#[derive(Debug)]
2507pub struct SemaphoreMetrics {
2508 cfg: PersistConfig,
2509 registry: MetricsRegistry,
2510 fetch: OnceCell<MetricsSemaphore>,
2511}
2512
2513impl SemaphoreMetrics {
2514 fn new(cfg: PersistConfig, registry: MetricsRegistry) -> Self {
2515 SemaphoreMetrics {
2516 cfg,
2517 registry,
2518 fetch: OnceCell::new(),
2519 }
2520 }
2521
2522 async fn fetch(&self) -> &MetricsSemaphore {
2526 if let Some(x) = self.fetch.get() {
2527 return x;
2529 }
2530 let cfg = self.cfg.clone();
2531 let registry = self.registry.clone();
2532 let init = async move {
2533 let total_permits = match cfg.announce_memory_limit {
2534 Some(mem) if cfg.is_cc_active => {
2537 info!("fetch semaphore awaiting first dyncfg values");
2540 let () = cfg.configs_synced_once().await;
2541 let total_permits = usize::cast_lossy(
2542 f64::cast_lossy(mem) * FETCH_SEMAPHORE_PERMIT_ADJUSTMENT.get(&cfg),
2543 );
2544 info!("fetch_semaphore got first dyncfg values");
2545 total_permits
2546 }
2547 Some(_) | None => Semaphore::MAX_PERMITS,
2548 };
2549 MetricsSemaphore::new(®istry, "fetch", total_permits)
2550 };
2551 self.fetch.get_or_init(|| init).await
2552 }
2553
2554 pub(crate) async fn acquire_fetch_permits(&self, encoded_size_bytes: usize) -> MetricsPermits {
2555 let requested_permits = f64::cast_lossy(encoded_size_bytes);
2558 let requested_permits = requested_permits * FETCH_SEMAPHORE_COST_ADJUSTMENT.get(&self.cfg);
2559 let requested_permits = usize::cast_lossy(requested_permits);
2560 self.fetch().await.acquire_permits(requested_permits).await
2561 }
2562}
2563
2564#[derive(Debug)]
2565pub struct MetricsSemaphore {
2566 name: &'static str,
2567 semaphore: Arc<Semaphore>,
2568 total_permits: usize,
2569 acquire_count: IntCounter,
2570 blocking_count: IntCounter,
2571 blocking_seconds: Counter,
2572 acquired_permits: IntCounter,
2573 released_permits: IntCounter,
2574 _available_permits: ComputedUIntGauge,
2575}
2576
2577impl MetricsSemaphore {
2578 pub fn new(registry: &MetricsRegistry, name: &'static str, total_permits: usize) -> Self {
2579 let total_permits = std::cmp::min(total_permits, Semaphore::MAX_PERMITS);
2580 let semaphore = Arc::new(Semaphore::new(total_permits));
2583 MetricsSemaphore {
2584 name,
2585 total_permits,
2586 acquire_count: registry.register(metric!(
2587 name: "mz_persist_semaphore_acquire_count",
2588 help: "count of acquire calls (not acquired permits count)",
2589 const_labels: {"name" => name},
2590 )),
2591 blocking_count: registry.register(metric!(
2592 name: "mz_persist_semaphore_blocking_count",
2593 help: "count of acquire calls that had to block",
2594 const_labels: {"name" => name},
2595 )),
2596 blocking_seconds: registry.register(metric!(
2597 name: "mz_persist_semaphore_blocking_seconds",
2598 help: "total time spent blocking on permit acquisition",
2599 const_labels: {"name" => name},
2600 )),
2601 acquired_permits: registry.register(metric!(
2602 name: "mz_persist_semaphore_acquired_permits",
2603 help: "total sum of acquired permits",
2604 const_labels: {"name" => name},
2605 )),
2606 released_permits: registry.register(metric!(
2607 name: "mz_persist_semaphore_released_permits",
2608 help: "total sum of released permits",
2609 const_labels: {"name" => name},
2610 )),
2611 _available_permits: registry.register_computed_gauge(
2612 metric!(
2613 name: "mz_persist_semaphore_available_permits",
2614 help: "currently available permits according to the semaphore",
2615 ),
2616 {
2617 let semaphore = Arc::clone(&semaphore);
2618 move || u64::cast_from(semaphore.available_permits())
2619 },
2620 ),
2621 semaphore,
2622 }
2623 }
2624
2625 pub async fn acquire_permits(&self, requested_permits: usize) -> MetricsPermits {
2626 let total_permits = u32::try_from(self.total_permits).unwrap_or(u32::MAX);
2629 let requested_permits = u32::try_from(requested_permits).unwrap_or(u32::MAX);
2630 let requested_permits = std::cmp::min(requested_permits, total_permits);
2631 let wrap = |_permit| {
2632 self.acquired_permits.inc_by(u64::from(requested_permits));
2633 MetricsPermits {
2634 _permit,
2635 released_metric: self.released_permits.clone(),
2636 count: requested_permits,
2637 }
2638 };
2639
2640 self.acquire_count.inc();
2642 match Arc::clone(&self.semaphore).try_acquire_many_owned(requested_permits) {
2643 Ok(x) => return wrap(x),
2644 Err(_) => {}
2645 };
2646
2647 self.blocking_count.inc();
2649 let start = Instant::now();
2650 let ret = Arc::clone(&self.semaphore)
2651 .acquire_many_owned(requested_permits)
2652 .instrument(info_span!("acquire_permits"))
2653 .await;
2654 let elapsed = start.elapsed();
2655 self.blocking_seconds.inc_by(elapsed.as_secs_f64());
2656 debug!(
2657 "acquisition of {} {} permits blocked for {:?}",
2658 self.name, requested_permits, elapsed
2659 );
2660 wrap(ret.expect("semaphore is never closed"))
2661 }
2662}
2663
2664#[derive(Debug)]
2665pub struct MetricsPermits {
2666 _permit: OwnedSemaphorePermit,
2667 released_metric: IntCounter,
2668 count: u32,
2669}
2670
2671impl Drop for MetricsPermits {
2672 fn drop(&mut self) {
2673 self.released_metric.inc_by(u64::from(self.count))
2674 }
2675}
2676
2677#[derive(Debug)]
2678pub struct ExternalOpMetrics {
2679 started: IntCounter,
2680 succeeded: IntCounter,
2681 failed: IntCounter,
2682 bytes: IntCounter,
2683 seconds: Counter,
2684 seconds_histogram: Option<Histogram>,
2685 alerts_metrics: Arc<AlertsMetrics>,
2686}
2687
2688impl ExternalOpMetrics {
2689 async fn run_op<R, F, OpFn, ErrFn>(
2690 &self,
2691 op_fn: OpFn,
2692 on_err_fn: ErrFn,
2693 ) -> Result<R, ExternalError>
2694 where
2695 F: std::future::Future<Output = Result<R, ExternalError>>,
2696 OpFn: FnOnce() -> F,
2697 ErrFn: FnOnce(&AlertsMetrics, &ExternalError),
2698 {
2699 self.started.inc();
2700 let start = Instant::now();
2701 let res = op_fn().await;
2702 let elapsed_seconds = start.elapsed().as_secs_f64();
2703 self.seconds.inc_by(elapsed_seconds);
2704 if let Some(h) = &self.seconds_histogram {
2705 h.observe(elapsed_seconds);
2706 }
2707 match res.as_ref() {
2708 Ok(_) => self.succeeded.inc(),
2709 Err(err) => {
2710 self.failed.inc();
2711 on_err_fn(&self.alerts_metrics, err);
2712 }
2713 };
2714 res
2715 }
2716
2717 fn run_stream<'a, R: 'a, S, OpFn, ErrFn>(
2718 &'a self,
2719 op_fn: OpFn,
2720 mut on_err_fn: ErrFn,
2721 ) -> impl futures::Stream<Item = Result<R, ExternalError>> + 'a
2722 where
2723 S: futures::Stream<Item = Result<R, ExternalError>> + Unpin + 'a,
2724 OpFn: FnOnce() -> S,
2725 ErrFn: FnMut(&AlertsMetrics, &ExternalError) + 'a,
2726 {
2727 self.started.inc();
2728 let start = Instant::now();
2729 let mut stream = op_fn();
2730 stream! {
2731 let mut succeeded = true;
2732 while let Some(res) = stream.next().await {
2733 if let Err(err) = res.as_ref() {
2734 on_err_fn(&self.alerts_metrics, err);
2735 succeeded = false;
2736 }
2737 yield res;
2738 }
2739 if succeeded {
2740 self.succeeded.inc()
2741 } else {
2742 self.failed.inc()
2743 }
2744 let elapsed_seconds = start.elapsed().as_secs_f64();
2745 self.seconds.inc_by(elapsed_seconds);
2746 if let Some(h) = &self.seconds_histogram {
2747 h.observe(elapsed_seconds);
2748 }
2749 }
2750 }
2751}
2752
2753#[derive(Debug)]
2754pub struct BlobMetrics {
2755 set: ExternalOpMetrics,
2756 get: ExternalOpMetrics,
2757 list_keys: ExternalOpMetrics,
2758 delete: ExternalOpMetrics,
2759 restore: ExternalOpMetrics,
2760 delete_noop: IntCounter,
2761 blob_sizes: Histogram,
2762 pub rtt_latency: Gauge,
2763}
2764
2765#[derive(Debug)]
2766pub struct MetricsBlob {
2767 blob: Arc<dyn Blob>,
2768 metrics: Arc<Metrics>,
2769}
2770
2771impl MetricsBlob {
2772 pub fn new(blob: Arc<dyn Blob>, metrics: Arc<Metrics>) -> Self {
2773 MetricsBlob { blob, metrics }
2774 }
2775
2776 fn on_err(alerts_metrics: &AlertsMetrics, _err: &ExternalError) {
2777 alerts_metrics.blob_failures.inc()
2778 }
2779}
2780
2781#[async_trait]
2782impl Blob for MetricsBlob {
2783 #[instrument(name = "blob::get", fields(shard=blob_key_shard_id(key)))]
2784 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
2785 let res = self
2786 .metrics
2787 .blob
2788 .get
2789 .run_op(|| self.blob.get(key), Self::on_err)
2790 .await;
2791 if let Ok(Some(value)) = res.as_ref() {
2792 self.metrics
2793 .blob
2794 .get
2795 .bytes
2796 .inc_by(u64::cast_from(value.len()));
2797 }
2798 res
2799 }
2800
2801 #[instrument(name = "blob::list_keys_and_metadata", fields(shard=blob_key_shard_id(key_prefix)))]
2802 async fn list_keys_and_metadata(
2803 &self,
2804 key_prefix: &str,
2805 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
2806 ) -> Result<(), ExternalError> {
2807 let mut byte_total = 0;
2808 let mut instrumented = |blob_metadata: BlobMetadata| {
2809 byte_total += blob_metadata.key.len();
2812 f(blob_metadata)
2813 };
2814
2815 let res = self
2816 .metrics
2817 .blob
2818 .list_keys
2819 .run_op(
2820 || {
2821 self.blob
2822 .list_keys_and_metadata(key_prefix, &mut instrumented)
2823 },
2824 Self::on_err,
2825 )
2826 .await;
2827
2828 self.metrics
2829 .blob
2830 .list_keys
2831 .bytes
2832 .inc_by(u64::cast_from(byte_total));
2833
2834 res
2835 }
2836
2837 #[instrument(name = "blob::set", fields(shard=blob_key_shard_id(key),size_bytes=value.len()))]
2838 async fn set(&self, key: &str, value: Bytes) -> Result<(), ExternalError> {
2839 let bytes = value.len();
2840 let res = self
2841 .metrics
2842 .blob
2843 .set
2844 .run_op(|| self.blob.set(key, value), Self::on_err)
2845 .await;
2846 if res.is_ok() {
2847 self.metrics.blob.set.bytes.inc_by(u64::cast_from(bytes));
2848 self.metrics.blob.blob_sizes.observe(f64::cast_lossy(bytes));
2849 }
2850 res
2851 }
2852
2853 #[instrument(name = "blob::delete", fields(shard=blob_key_shard_id(key)))]
2854 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
2855 let bytes = self
2856 .metrics
2857 .blob
2858 .delete
2859 .run_op(|| self.blob.delete(key), Self::on_err)
2860 .await?;
2861 if let Some(bytes) = bytes {
2862 self.metrics.blob.delete.bytes.inc_by(u64::cast_from(bytes));
2863 } else {
2864 self.metrics.blob.delete_noop.inc();
2865 }
2866 Ok(bytes)
2867 }
2868
2869 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
2870 self.metrics
2871 .blob
2872 .restore
2873 .run_op(|| self.blob.restore(key), Self::on_err)
2874 .await
2875 }
2876}
2877
2878#[derive(Debug)]
2879pub struct ConsensusMetrics {
2880 list_keys: ExternalOpMetrics,
2881 head: ExternalOpMetrics,
2882 compare_and_set: ExternalOpMetrics,
2883 scan: ExternalOpMetrics,
2884 truncate: ExternalOpMetrics,
2885 truncated_count: IntCounter,
2886 pub rtt_latency: Gauge,
2887}
2888
2889#[derive(Debug)]
2890pub struct MetricsConsensus {
2891 consensus: Arc<dyn Consensus>,
2892 metrics: Arc<Metrics>,
2893}
2894
2895impl MetricsConsensus {
2896 pub fn new(consensus: Arc<dyn Consensus>, metrics: Arc<Metrics>) -> Self {
2897 MetricsConsensus { consensus, metrics }
2898 }
2899
2900 fn on_err(alerts_metrics: &AlertsMetrics, err: &ExternalError) {
2901 if let ExternalError::Indeterminate(_) = err {
2905 alerts_metrics.consensus_failures.inc()
2906 }
2907 }
2908}
2909
2910#[async_trait]
2911impl Consensus for MetricsConsensus {
2912 fn list_keys(&self) -> ResultStream<'_, String> {
2913 Box::pin(
2914 self.metrics
2915 .consensus
2916 .list_keys
2917 .run_stream(|| self.consensus.list_keys(), Self::on_err),
2918 )
2919 }
2920
2921 #[instrument(name = "consensus::head", fields(shard=key))]
2922 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
2923 let res = self
2924 .metrics
2925 .consensus
2926 .head
2927 .run_op(|| self.consensus.head(key), Self::on_err)
2928 .await;
2929 if let Ok(Some(data)) = res.as_ref() {
2930 self.metrics
2931 .consensus
2932 .head
2933 .bytes
2934 .inc_by(u64::cast_from(data.data.len()));
2935 }
2936 res
2937 }
2938
2939 #[instrument(name = "consensus::compare_and_set", fields(shard=key,size_bytes=new.data.len()))]
2940 async fn compare_and_set(
2941 &self,
2942 key: &str,
2943 new: VersionedData,
2944 ) -> Result<CaSResult, ExternalError> {
2945 let bytes = new.data.len();
2946 let res = self
2947 .metrics
2948 .consensus
2949 .compare_and_set
2950 .run_op(|| self.consensus.compare_and_set(key, new), Self::on_err)
2951 .await;
2952 match res.as_ref() {
2953 Ok(CaSResult::Committed) => self
2954 .metrics
2955 .consensus
2956 .compare_and_set
2957 .bytes
2958 .inc_by(u64::cast_from(bytes)),
2959 Ok(CaSResult::ExpectationMismatch) | Err(_) => {}
2960 }
2961 res
2962 }
2963
2964 #[instrument(name = "consensus::scan", fields(shard=key))]
2965 async fn scan(
2966 &self,
2967 key: &str,
2968 from: SeqNo,
2969 limit: usize,
2970 ) -> Result<Vec<VersionedData>, ExternalError> {
2971 let res = self
2972 .metrics
2973 .consensus
2974 .scan
2975 .run_op(|| self.consensus.scan(key, from, limit), Self::on_err)
2976 .await;
2977 if let Ok(dataz) = res.as_ref() {
2978 let bytes: usize = dataz.iter().map(|x| x.data.len()).sum();
2979 self.metrics
2980 .consensus
2981 .scan
2982 .bytes
2983 .inc_by(u64::cast_from(bytes));
2984 }
2985 res
2986 }
2987
2988 #[instrument(name = "consensus::truncate", fields(shard=key))]
2989 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
2990 let metrics = &self.metrics.consensus;
2991 let deleted = metrics
2992 .truncate
2993 .run_op(|| self.consensus.truncate(key, seqno), Self::on_err)
2994 .await?;
2995 if let Some(deleted) = deleted {
2996 metrics.truncated_count.inc_by(u64::cast_from(deleted));
2997 }
2998 Ok(deleted)
2999 }
3000}
3001
3002#[derive(Debug, Clone)]
3005pub struct TaskMetrics {
3006 f64_gauges: Vec<(Gauge, fn(&tokio_metrics::TaskMetrics) -> f64)>,
3007 u64_gauges: Vec<(
3008 GenericGauge<AtomicU64>,
3009 fn(&tokio_metrics::TaskMetrics) -> u64,
3010 )>,
3011 monitor: TaskMonitor,
3012}
3013
3014impl TaskMetrics {
3015 pub fn new(name: &str) -> Self {
3016 let monitor = TaskMonitor::new();
3017 Self {
3018 f64_gauges: vec![
3019 (
3020 Gauge::make_collector(metric!(
3021 name: "mz_persist_task_total_idle_duration",
3022 help: "Seconds of time spent idling, ie. waiting for a task to be woken up.",
3023 const_labels: {"name" => name}
3024 )),
3025 |m| m.total_idle_duration.as_secs_f64(),
3026 ),
3027 (
3028 Gauge::make_collector(metric!(
3029 name: "mz_persist_task_total_scheduled_duration",
3030 help: "Seconds of time spent scheduled, ie. ready to poll but not yet polled.",
3031 const_labels: {"name" => name}
3032 )),
3033 |m| m.total_scheduled_duration.as_secs_f64(),
3034 ),
3035 ],
3036 u64_gauges: vec![
3037 (
3038 MakeCollector::make_collector(metric!(
3039 name: "mz_persist_task_total_scheduled_count",
3040 help: "The total number of task schedules. Useful for computing the average scheduled time.",
3041 const_labels: {"name" => name}
3042 )),
3043 |m| m.total_scheduled_count,
3044 ),
3045 (
3046 MakeCollector::make_collector(metric!(
3047 name: "mz_persist_task_total_idled_count",
3048 help: "The total number of task idles. Useful for computing the average idle time.",
3049 const_labels: {"name" => name}
3050 ,
3051 )),
3052 |m| m.total_idled_count,
3053 ),
3054 ],
3055 monitor,
3056 }
3057 }
3058
3059 pub fn instrument_task<F>(&self, task: F) -> tokio_metrics::Instrumented<F> {
3062 TaskMonitor::instrument(&self.monitor, task)
3063 }
3064}
3065
3066impl Collector for TaskMetrics {
3067 fn desc(&self) -> Vec<&Desc> {
3068 let mut descs = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
3069 for (g, _) in &self.f64_gauges {
3070 descs.extend(g.desc());
3071 }
3072 for (g, _) in &self.u64_gauges {
3073 descs.extend(g.desc());
3074 }
3075 descs
3076 }
3077
3078 fn collect(&self) -> Vec<MetricFamily> {
3079 let mut families = Vec::with_capacity(self.f64_gauges.len() + self.u64_gauges.len());
3080 let metrics = self.monitor.cumulative();
3081 for (g, metrics_fn) in &self.f64_gauges {
3082 g.set(metrics_fn(&metrics));
3083 families.extend(g.collect());
3084 }
3085 for (g, metrics_fn) in &self.u64_gauges {
3086 g.set(metrics_fn(&metrics));
3087 families.extend(g.collect());
3088 }
3089 families
3090 }
3091}
3092
3093#[derive(Debug)]
3094pub struct TasksMetrics {
3095 pub heartbeat_read: TaskMetrics,
3096}
3097
3098impl TasksMetrics {
3099 fn new(registry: &MetricsRegistry) -> Self {
3100 let heartbeat_read = TaskMetrics::new("heartbeat_read");
3101 registry.register_collector(heartbeat_read.clone());
3102 TasksMetrics { heartbeat_read }
3103 }
3104}
3105
3106#[derive(Debug)]
3107pub struct SchemaMetrics {
3108 pub(crate) cache_fetch_state_count: IntCounter,
3109 pub(crate) cache_schema: SchemaCacheMetrics,
3110 pub(crate) cache_migration: SchemaCacheMetrics,
3111 pub(crate) migration_count_same: IntCounter,
3112 pub(crate) migration_count_codec: IntCounter,
3113 pub(crate) migration_count_either: IntCounter,
3114 pub(crate) migration_len_legacy_codec: IntCounter,
3115 pub(crate) migration_len_either_codec: IntCounter,
3116 pub(crate) migration_len_either_arrow: IntCounter,
3117 pub(crate) migration_new_count: IntCounter,
3118 pub(crate) migration_new_seconds: Counter,
3119 pub(crate) migration_migrate_seconds: Counter,
3120}
3121
3122impl SchemaMetrics {
3123 fn new(registry: &MetricsRegistry) -> Self {
3124 let cached: IntCounterVec = registry.register(metric!(
3125 name: "mz_persist_schema_cache_cached_count",
3126 help: "count of schema cache entries served from cache",
3127 var_labels: ["op"],
3128 ));
3129 let computed: IntCounterVec = registry.register(metric!(
3130 name: "mz_persist_schema_cache_computed_count",
3131 help: "count of schema cache entries computed",
3132 var_labels: ["op"],
3133 ));
3134 let unavailable: IntCounterVec = registry.register(metric!(
3135 name: "mz_persist_schema_cache_unavailable_count",
3136 help: "count of schema cache entries unavailable at current state",
3137 var_labels: ["op"],
3138 ));
3139 let added: IntCounterVec = registry.register(metric!(
3140 name: "mz_persist_schema_cache_added_count",
3141 help: "count of schema cache entries added",
3142 var_labels: ["op"],
3143 ));
3144 let dropped: IntCounterVec = registry.register(metric!(
3145 name: "mz_persist_schema_cache_dropped_count",
3146 help: "count of schema cache entries dropped",
3147 var_labels: ["op"],
3148 ));
3149 let cache = |name| SchemaCacheMetrics {
3150 cached_count: cached.with_label_values(&[name]),
3151 computed_count: computed.with_label_values(&[name]),
3152 unavailable_count: unavailable.with_label_values(&[name]),
3153 added_count: added.with_label_values(&[name]),
3154 dropped_count: dropped.with_label_values(&[name]),
3155 };
3156 let migration_count: IntCounterVec = registry.register(metric!(
3157 name: "mz_persist_schema_migration_count",
3158 help: "count of fetch part migrations",
3159 var_labels: ["op"],
3160 ));
3161 let migration_len: IntCounterVec = registry.register(metric!(
3162 name: "mz_persist_schema_migration_len",
3163 help: "count of migrated update records",
3164 var_labels: ["op"],
3165 ));
3166 SchemaMetrics {
3167 cache_fetch_state_count: registry.register(metric!(
3168 name: "mz_persist_schema_cache_fetch_state_count",
3169 help: "count of state fetches by the schema cache",
3170 )),
3171 cache_schema: cache("schema"),
3172 cache_migration: cache("migration"),
3173 migration_count_same: migration_count.with_label_values(&["same"]),
3174 migration_count_codec: migration_count.with_label_values(&["codec"]),
3175 migration_count_either: migration_count.with_label_values(&["either"]),
3176 migration_len_legacy_codec: migration_len.with_label_values(&["legacy_codec"]),
3177 migration_len_either_codec: migration_len.with_label_values(&["either_codec"]),
3178 migration_len_either_arrow: migration_len.with_label_values(&["either_arrow"]),
3179 migration_new_count: registry.register(metric!(
3180 name: "mz_persist_schema_migration_new_count",
3181 help: "count of migrations constructed",
3182 )),
3183 migration_new_seconds: registry.register(metric!(
3184 name: "mz_persist_schema_migration_new_seconds",
3185 help: "seconds spent constructing migration logic",
3186 )),
3187 migration_migrate_seconds: registry.register(metric!(
3188 name: "mz_persist_schema_migration_migrate_seconds",
3189 help: "seconds spent applying migration logic",
3190 )),
3191 }
3192 }
3193}
3194
3195#[derive(Debug, Clone)]
3196pub struct SchemaCacheMetrics {
3197 pub(crate) cached_count: IntCounter,
3198 pub(crate) computed_count: IntCounter,
3199 pub(crate) unavailable_count: IntCounter,
3200 pub(crate) added_count: IntCounter,
3201 pub(crate) dropped_count: IntCounter,
3202}
3203
3204#[derive(Debug)]
3205pub struct InlineMetrics {
3206 pub(crate) part_commit_count: IntCounter,
3207 pub(crate) part_commit_bytes: IntCounter,
3208 pub(crate) backpressure: BatchWriteMetrics,
3209}
3210
3211impl InlineMetrics {
3212 fn new(registry: &MetricsRegistry) -> Self {
3213 InlineMetrics {
3214 part_commit_count: registry.register(metric!(
3215 name: "mz_persist_inline_part_commit_count",
3216 help: "count of inline parts committed to state",
3217 )),
3218 part_commit_bytes: registry.register(metric!(
3219 name: "mz_persist_inline_part_commit_bytes",
3220 help: "total size of of inline parts committed to state",
3221 )),
3222 backpressure: BatchWriteMetrics::new(registry, "inline_backpressure"),
3223 }
3224 }
3225}
3226
3227fn blob_key_shard_id(key: &str) -> Option<String> {
3228 let (shard_id, _) = BlobKey::parse_ids(key).ok()?;
3229 Some(shard_id.to_string())
3230}
3231
3232pub fn encode_ts_metric<T: Codec64>(ts: &Antichain<T>) -> i64 {
3234 match ts.elements().first() {
3244 Some(ts) => i64::from_le_bytes(Codec64::encode(ts)),
3245 None => i64::MAX,
3246 }
3247}