mz_service/
grpc.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! gRPC transport for the [client](crate::client) module.
11//!
12//! Note: gRPC has been replaced by [CTP](crate::transport). Most of the gRPC code has been
13//! removed, but some vestiges remain until we find time to move them to more appropriate places.
14
15use std::time::UNIX_EPOCH;
16
17use mz_ore::metric;
18use mz_ore::metrics::{DeleteOnDropGauge, MetricsRegistry, UIntGaugeVec};
19use prometheus::core::AtomicU64;
20use tracing::error;
21
22use crate::transport;
23
24/// Metrics for a gRPC server.
25pub struct GrpcServerMetrics {
26    last_command_received: UIntGaugeVec,
27}
28
29impl GrpcServerMetrics {
30    /// Registers the GRPC server metrics into a `registry`.
31    pub fn register_with(registry: &MetricsRegistry) -> Self {
32        Self {
33            last_command_received: registry.register(metric!(
34                name: "mz_grpc_server_last_command_received",
35                help: "The time at which the server received its last command.",
36                var_labels: ["server_name"],
37            )),
38        }
39    }
40
41    pub fn for_server(&self, name: &'static str) -> PerGrpcServerMetrics {
42        PerGrpcServerMetrics {
43            last_command_received: self
44                .last_command_received
45                .get_delete_on_drop_metric(vec![name]),
46        }
47    }
48}
49
50#[derive(Clone, Debug)]
51pub struct PerGrpcServerMetrics {
52    last_command_received: DeleteOnDropGauge<AtomicU64, Vec<&'static str>>,
53}
54
55impl<C, R> transport::Metrics<C, R> for PerGrpcServerMetrics {
56    fn bytes_sent(&mut self, _len: usize) {}
57    fn bytes_received(&mut self, _len: usize) {}
58    fn message_sent(&mut self, _msg: &C) {}
59
60    fn message_received(&mut self, _msg: &R) {
61        match UNIX_EPOCH.elapsed() {
62            Ok(ts) => self.last_command_received.set(ts.as_secs()),
63            Err(e) => error!("failed to get system time: {e}"),
64        }
65    }
66}