Skip to main content

mz_compute_client/
metrics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metrics for the compute controller components
11
12use std::borrow::Borrow;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_cluster_client::ReplicaId;
17use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
18use mz_compute_types::ComputeInstanceId;
19use mz_ore::cast::CastFrom;
20use mz_ore::metric;
21use mz_ore::metrics::raw::UIntGaugeVec;
22use mz_ore::metrics::{
23    CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, HistogramVec,
24    IntCounterVec, MetricVecExt, MetricsRegistry, Rule,
25};
26use mz_ore::stats::histogram_seconds_buckets;
27use mz_repr::GlobalId;
28use mz_service::transport;
29use prometheus::core::{AtomicF64, AtomicU64};
30
31use crate::protocol::command::ComputeCommand;
32use crate::protocol::response::{ComputeResponse, PeekResponse};
33
34pub(crate) type Counter = DeleteOnDropCounter<AtomicF64, Vec<String>>;
35pub(crate) type IntCounter = DeleteOnDropCounter<AtomicU64, Vec<String>>;
36pub(crate) type UIntGauge = DeleteOnDropGauge<AtomicU64, Vec<String>>;
37type Histogram = DeleteOnDropHistogram<Vec<String>>;
38
39/// Compute controller metrics.
40#[derive(Debug, Clone)]
41pub struct ComputeControllerMetrics {
42    // compute protocol
43    commands_total: IntCounterVec,
44    command_message_bytes_total: IntCounterVec,
45    responses_total: IntCounterVec,
46    response_message_bytes_total: IntCounterVec,
47
48    // controller state
49    replica_count: UIntGaugeVec,
50    collection_count: UIntGaugeVec,
51    collection_unscheduled_count: UIntGaugeVec,
52    peek_count: UIntGaugeVec,
53    subscribe_count: UIntGaugeVec,
54    copy_to_count: UIntGaugeVec,
55    command_queue_size: UIntGaugeVec,
56    response_send_count: IntCounterVec,
57    response_recv_count: IntCounterVec,
58    hydration_queue_size: UIntGaugeVec,
59
60    // command history
61    history_command_count: UIntGaugeVec,
62    history_dataflow_count: UIntGaugeVec,
63
64    // peeks
65    peeks_total: IntCounterVec,
66    peek_duration_seconds: HistogramVec,
67
68    // replica connections
69    connected_replica_count: UIntGaugeVec,
70    replica_connects_total: IntCounterVec,
71    replica_connect_wait_time_seconds_total: CounterVec,
72
73    /// Metrics shared with the storage controller.
74    shared: ControllerMetrics,
75}
76
77fn instance_name_rule() -> Rule {
78    Rule::ClusterNameLookup {
79        cluster_id_label: "instance_id".into(),
80        output_label: "instance_name".into(),
81    }
82}
83
84fn replica_name_rule() -> Rule {
85    Rule::ReplicaNameLookup {
86        cluster_id_label: "instance_id".into(),
87        replica_id_label: "replica_id".into(),
88        output_label: "replica_name".into(),
89    }
90}
91
92impl ComputeControllerMetrics {
93    /// Create a metrics instance registered into the given registry.
94    pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
95        let metrics = ComputeControllerMetrics {
96            commands_total: metrics_registry.register(metric!(
97                name: "mz_compute_commands_total",
98                help: "The total number of compute commands sent.",
99                var_labels: ["instance_id", "replica_id", "command_type"],
100                rules: [instance_name_rule(), replica_name_rule()],
101            )),
102            command_message_bytes_total: metrics_registry.register(metric!(
103                name: "mz_compute_command_message_bytes_total",
104                help: "The total number of bytes sent in compute command messages.",
105                var_labels: ["instance_id", "replica_id"],
106                rules: [instance_name_rule(), replica_name_rule()],
107            )),
108            responses_total: metrics_registry.register(metric!(
109                name: "mz_compute_responses_total",
110                help: "The total number of compute responses sent.",
111                var_labels: ["instance_id", "replica_id", "response_type"],
112                rules: [instance_name_rule(), replica_name_rule()],
113            )),
114            response_message_bytes_total: metrics_registry.register(metric!(
115                name: "mz_compute_response_message_bytes_total",
116                help: "The total number of bytes sent in compute response messages.",
117                var_labels: ["instance_id", "replica_id"],
118                rules: [instance_name_rule(), replica_name_rule()],
119            )),
120            replica_count: metrics_registry.register(metric!(
121                name: "mz_compute_controller_replica_count",
122                help: "The number of replicas.",
123                var_labels: ["instance_id"],
124                rules: [instance_name_rule()],
125            )),
126            collection_count: metrics_registry.register(metric!(
127                name: "mz_compute_controller_collection_count",
128                help: "The number of installed compute collections.",
129                var_labels: ["instance_id"],
130                rules: [instance_name_rule()],
131            )),
132            collection_unscheduled_count: metrics_registry.register(metric!(
133                name: "mz_compute_controller_collection_unscheduled_count",
134                help: "The number of installed but unscheduled compute collections.",
135                var_labels: ["instance_id"],
136                rules: [instance_name_rule()],
137            )),
138            peek_count: metrics_registry.register(metric!(
139                name: "mz_compute_controller_peek_count",
140                help: "The number of pending peeks.",
141                var_labels: ["instance_id"],
142                rules: [instance_name_rule()],
143            )),
144            subscribe_count: metrics_registry.register(metric!(
145                name: "mz_compute_controller_subscribe_count",
146                help: "The number of active subscribes.",
147                var_labels: ["instance_id"],
148                rules: [instance_name_rule()],
149            )),
150            copy_to_count: metrics_registry.register(metric!(
151                name: "mz_compute_controller_copy_to_count",
152                help: "The number of active copy tos.",
153                var_labels: ["instance_id"],
154                rules: [instance_name_rule()],
155            )),
156            command_queue_size: metrics_registry.register(metric!(
157                name: "mz_compute_controller_command_queue_size",
158                help: "The size of the compute command queue.",
159                var_labels: ["instance_id", "replica_id"],
160                rules: [instance_name_rule(), replica_name_rule()],
161            )),
162            response_send_count: metrics_registry.register(metric!(
163                name: "mz_compute_controller_response_send_count",
164                help: "The number of sends on the compute response queue.",
165                var_labels: ["instance_id"],
166                rules: [instance_name_rule()],
167            )),
168            response_recv_count: metrics_registry.register(metric!(
169                name: "mz_compute_controller_response_recv_count",
170                help: "The number of receives on the compute response queue.",
171                var_labels: ["instance_id"],
172                rules: [instance_name_rule()],
173            )),
174            hydration_queue_size: metrics_registry.register(metric!(
175                name: "mz_compute_controller_hydration_queue_size",
176                help: "The size of the compute hydration queue.",
177                var_labels: ["instance_id", "replica_id"],
178                rules: [instance_name_rule(), replica_name_rule()],
179            )),
180            history_command_count: metrics_registry.register(metric!(
181                name: "mz_compute_controller_history_command_count",
182                help: "The number of commands in the controller's command history.",
183                var_labels: ["instance_id", "command_type"],
184                rules: [instance_name_rule()],
185            )),
186            history_dataflow_count: metrics_registry.register(metric!(
187                name: "mz_compute_controller_history_dataflow_count",
188                help: "The number of dataflows in the controller's command history.",
189                var_labels: ["instance_id"],
190                rules: [instance_name_rule()],
191            )),
192            peeks_total: metrics_registry.register(metric!(
193                name: "mz_compute_peeks_total",
194                help: "The total number of peeks served.",
195                var_labels: ["instance_id", "result"],
196                rules: [instance_name_rule()],
197            )),
198            peek_duration_seconds: metrics_registry.register(metric!(
199                name: "mz_compute_peek_duration_seconds",
200                help: "A histogram of peek durations since restart.",
201                var_labels: ["instance_id", "result"],
202                buckets: histogram_seconds_buckets(0.000_500, 32.),
203                rules: [instance_name_rule()],
204            )),
205            connected_replica_count: metrics_registry.register(metric!(
206                name: "mz_compute_controller_connected_replica_count",
207                help: "The number of replicas successfully connected to the compute controller.",
208                var_labels: ["instance_id"],
209                rules: [instance_name_rule()],
210            )),
211            replica_connects_total: metrics_registry.register(metric!(
212                name: "mz_compute_controller_replica_connects_total",
213                help: "The total number of replica (re-)connections made by the compute controller.",
214                var_labels: ["instance_id", "replica_id"],
215                rules: [instance_name_rule(), replica_name_rule()],
216            )),
217            replica_connect_wait_time_seconds_total: metrics_registry.register(metric!(
218                name: "mz_compute_controller_replica_connect_wait_time_seconds_total",
219                help: "The total time the compute controller spent waiting for replica (re-)connection.",
220                var_labels: ["instance_id", "replica_id"],
221                rules: [instance_name_rule(), replica_name_rule()],
222            )),
223
224            shared,
225        };
226
227        metrics
228    }
229
230    /// Return an object suitable for tracking metrics for the given compute instance.
231    pub fn for_instance(&self, instance_id: ComputeInstanceId) -> InstanceMetrics {
232        let labels = vec![instance_id.to_string()];
233        let replica_count = self.replica_count.get_delete_on_drop_metric(labels.clone());
234        let collection_count = self
235            .collection_count
236            .get_delete_on_drop_metric(labels.clone());
237        let collection_unscheduled_count = self
238            .collection_unscheduled_count
239            .get_delete_on_drop_metric(labels.clone());
240        let peek_count = self.peek_count.get_delete_on_drop_metric(labels.clone());
241        let subscribe_count = self
242            .subscribe_count
243            .get_delete_on_drop_metric(labels.clone());
244        let copy_to_count = self.copy_to_count.get_delete_on_drop_metric(labels.clone());
245        let history_command_count = CommandMetrics::build(|typ| {
246            let labels = labels.iter().cloned().chain([typ.into()]).collect();
247            self.history_command_count.get_delete_on_drop_metric(labels)
248        });
249        let history_dataflow_count = self
250            .history_dataflow_count
251            .get_delete_on_drop_metric(labels.clone());
252        let peeks_total = PeekMetrics::build(|typ| {
253            let labels = labels.iter().cloned().chain([typ.into()]).collect();
254            self.peeks_total.get_delete_on_drop_metric(labels)
255        });
256        let peek_duration_seconds = PeekMetrics::build(|typ| {
257            let labels = labels.iter().cloned().chain([typ.into()]).collect();
258            self.peek_duration_seconds.get_delete_on_drop_metric(labels)
259        });
260        let response_send_count = self
261            .response_send_count
262            .get_delete_on_drop_metric(labels.clone());
263        let response_recv_count = self
264            .response_recv_count
265            .get_delete_on_drop_metric(labels.clone());
266        let connected_replica_count = self
267            .connected_replica_count
268            .get_delete_on_drop_metric(labels);
269
270        InstanceMetrics {
271            instance_id,
272            metrics: self.clone(),
273            replica_count,
274            collection_count,
275            collection_unscheduled_count,
276            copy_to_count,
277            peek_count,
278            subscribe_count,
279            history_command_count,
280            history_dataflow_count,
281            peeks_total,
282            peek_duration_seconds,
283            response_send_count,
284            response_recv_count,
285            connected_replica_count,
286        }
287    }
288}
289
290/// Per-instance metrics
291#[derive(Debug)]
292pub struct InstanceMetrics {
293    instance_id: ComputeInstanceId,
294    metrics: ComputeControllerMetrics,
295
296    /// Gauge tracking the number of replicas.
297    pub replica_count: UIntGauge,
298    /// Gauge tracking the number of installed compute collections.
299    pub collection_count: UIntGauge,
300    /// Gauge tracking the number of installed but unscheduled compute collections.
301    pub collection_unscheduled_count: UIntGauge,
302    /// Gauge tracking the number of pending peeks.
303    pub peek_count: UIntGauge,
304    /// Gauge tracking the number of active subscribes.
305    pub subscribe_count: UIntGauge,
306    /// Gauge tracking the number of active COPY TO queries.
307    pub copy_to_count: UIntGauge,
308    /// Gauges tracking the number of commands in the command history.
309    pub history_command_count: CommandMetrics<UIntGauge>,
310    /// Gauge tracking the number of dataflows in the command history.
311    pub history_dataflow_count: UIntGauge,
312    /// Counter tracking the total number of peeks served.
313    pub peeks_total: PeekMetrics<IntCounter>,
314    /// Histogram tracking peek durations.
315    pub peek_duration_seconds: PeekMetrics<Histogram>,
316    /// Counter tracking the number of sends on the compute response queue.
317    pub response_send_count: IntCounter,
318    /// Counter tracking the number of receives on the compute response queue.
319    pub response_recv_count: IntCounter,
320    /// Gauge tracking the number of connected replicas.
321    pub connected_replica_count: UIntGauge,
322}
323
324impl InstanceMetrics {
325    /// TODO(database-issues#7533): Add documentation.
326    pub fn for_replica(&self, replica_id: ReplicaId) -> ReplicaMetrics {
327        let labels = vec![self.instance_id.to_string(), replica_id.to_string()];
328        let extended_labels = |extra: &str| {
329            labels
330                .iter()
331                .cloned()
332                .chain([extra.into()])
333                .collect::<Vec<_>>()
334        };
335
336        let commands_total = CommandMetrics::build(|typ| {
337            let labels = extended_labels(typ);
338            self.metrics
339                .commands_total
340                .get_delete_on_drop_metric(labels)
341        });
342        let responses_total = ResponseMetrics::build(|typ| {
343            let labels = extended_labels(typ);
344            self.metrics
345                .responses_total
346                .get_delete_on_drop_metric(labels)
347        });
348
349        let command_message_bytes_total = self
350            .metrics
351            .command_message_bytes_total
352            .get_delete_on_drop_metric(labels.clone());
353        let response_message_bytes_total = self
354            .metrics
355            .response_message_bytes_total
356            .get_delete_on_drop_metric(labels.clone());
357
358        let command_queue_size = self
359            .metrics
360            .command_queue_size
361            .get_delete_on_drop_metric(labels.clone());
362        let hydration_queue_size = self
363            .metrics
364            .hydration_queue_size
365            .get_delete_on_drop_metric(labels.clone());
366
367        let replica_connects_total = self
368            .metrics
369            .replica_connects_total
370            .get_delete_on_drop_metric(labels.clone());
371        let replica_connect_wait_time_seconds_total = self
372            .metrics
373            .replica_connect_wait_time_seconds_total
374            .get_delete_on_drop_metric(labels);
375
376        ReplicaMetrics {
377            instance_id: self.instance_id,
378            replica_id,
379            metrics: self.metrics.clone(),
380            inner: Arc::new(ReplicaMetricsInner {
381                commands_total,
382                command_message_bytes_total,
383                responses_total,
384                response_message_bytes_total,
385                command_queue_size,
386                hydration_queue_size,
387                replica_connects_total,
388                replica_connect_wait_time_seconds_total,
389            }),
390        }
391    }
392
393    /// TODO(database-issues#7533): Add documentation.
394    pub fn for_history(&self) -> HistoryMetrics<UIntGauge> {
395        let labels = vec![self.instance_id.to_string()];
396        let command_counts = CommandMetrics::build(|typ| {
397            let labels = labels.iter().cloned().chain([typ.into()]).collect();
398            self.metrics
399                .history_command_count
400                .get_delete_on_drop_metric(labels)
401        });
402        let dataflow_count = self
403            .metrics
404            .history_dataflow_count
405            .get_delete_on_drop_metric(labels);
406
407        HistoryMetrics {
408            command_counts,
409            dataflow_count,
410        }
411    }
412
413    /// Reflect the given peek response in the metrics.
414    pub fn observe_peek_response(&self, response: &PeekResponse, duration: Duration) {
415        self.peeks_total.for_peek_response(response).inc();
416        self.peek_duration_seconds
417            .for_peek_response(response)
418            .observe(duration.as_secs_f64());
419    }
420}
421
422/// Per-replica metrics.
423#[derive(Debug, Clone)]
424pub struct ReplicaMetrics {
425    instance_id: ComputeInstanceId,
426    replica_id: ReplicaId,
427    metrics: ComputeControllerMetrics,
428
429    /// Metrics counters, wrapped in an `Arc` to be shareable between threads.
430    pub inner: Arc<ReplicaMetricsInner>,
431}
432
433/// Per-replica metrics counters.
434#[derive(Debug)]
435pub struct ReplicaMetricsInner {
436    commands_total: CommandMetrics<IntCounter>,
437    command_message_bytes_total: IntCounter,
438    responses_total: ResponseMetrics<IntCounter>,
439    response_message_bytes_total: IntCounter,
440
441    /// Gauge tracking the size of the compute command queue.
442    pub command_queue_size: UIntGauge,
443    /// Gauge tracking the size of the hydration queue.
444    pub hydration_queue_size: UIntGauge,
445
446    /// Counter tracking the total number of (re-)connects.
447    replica_connects_total: IntCounter,
448    /// Counter tracking the total time spent waiting for (re-)connects.
449    replica_connect_wait_time_seconds_total: Counter,
450}
451
452impl ReplicaMetrics {
453    pub(crate) fn for_collection(
454        &self,
455        collection_id: GlobalId,
456    ) -> Option<ReplicaCollectionMetrics> {
457        // In an effort to reduce the cardinality of timeseries created, we collect metrics only
458        // for non-transient collections. This is roughly equivalent to "long-lived" collections,
459        // with the exception of subscribes which may or may not be long-lived. We might want to
460        // change this policy in the future to track subscribes as well.
461        if collection_id.is_transient() {
462            return None;
463        }
464
465        let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
466            collection_id.to_string(),
467            Some(self.instance_id.to_string()),
468            Some(self.replica_id.to_string()),
469        );
470
471        Some(ReplicaCollectionMetrics { wallclock_lag })
472    }
473
474    /// Observe a successful replica connection.
475    pub(crate) fn observe_connect(&self) {
476        self.inner.replica_connects_total.inc();
477    }
478
479    /// Observe time spent waiting for a replica connection.
480    pub(crate) fn observe_connect_time(&self, wait_time: Duration) {
481        self.inner
482            .replica_connect_wait_time_seconds_total
483            .inc_by(wait_time.as_secs_f64());
484    }
485}
486
487impl transport::Metrics<ComputeCommand, ComputeResponse> for ReplicaMetrics {
488    fn bytes_sent(&mut self, len: usize) {
489        self.inner
490            .command_message_bytes_total
491            .inc_by(u64::cast_from(len));
492    }
493
494    fn bytes_received(&mut self, len: usize) {
495        self.inner
496            .response_message_bytes_total
497            .inc_by(u64::cast_from(len));
498    }
499
500    fn message_sent(&mut self, msg: &ComputeCommand) {
501        self.inner.commands_total.for_command(msg).inc();
502    }
503
504    fn message_received(&mut self, msg: &ComputeResponse) {
505        self.inner.responses_total.for_response(msg).inc();
506    }
507}
508
509/// Per-replica-and-collection metrics.
510#[derive(Debug)]
511pub(crate) struct ReplicaCollectionMetrics {
512    /// Metrics tracking dataflow wallclock lag.
513    pub wallclock_lag: WallclockLagMetrics,
514}
515
516/// Metrics keyed by `ComputeCommand` type.
517#[derive(Clone, Debug)]
518pub struct CommandMetrics<M> {
519    /// Metrics for `Hello`.
520    pub hello: M,
521    /// Metrics for `CreateInstance`.
522    pub create_instance: M,
523    /// Metrics for `CreateDataflow`.
524    pub create_dataflow: M,
525    /// Metrics for `Schedule`.
526    pub schedule: M,
527    /// Metrics for `AllowCompaction`.
528    pub allow_compaction: M,
529    /// Metrics for `Peek`.
530    pub peek: M,
531    /// Metrics for `CancelPeek`.
532    pub cancel_peek: M,
533    /// Metrics for `InitializationComplete`.
534    pub initialization_complete: M,
535    /// Metrics for `UpdateConfiguration`.
536    pub update_configuration: M,
537    /// Metrics for `AllowWrites`.
538    pub allow_writes: M,
539}
540
541impl<M> CommandMetrics<M> {
542    /// TODO(database-issues#7533): Add documentation.
543    pub fn build<F>(build_metric: F) -> Self
544    where
545        F: Fn(&str) -> M,
546    {
547        Self {
548            hello: build_metric("hello"),
549            create_instance: build_metric("create_instance"),
550            create_dataflow: build_metric("create_dataflow"),
551            schedule: build_metric("schedule"),
552            allow_compaction: build_metric("allow_compaction"),
553            peek: build_metric("peek"),
554            cancel_peek: build_metric("cancel_peek"),
555            initialization_complete: build_metric("initialization_complete"),
556            update_configuration: build_metric("update_configuration"),
557            allow_writes: build_metric("allow_writes"),
558        }
559    }
560
561    fn for_all<F>(&self, f: F)
562    where
563        F: Fn(&M),
564    {
565        f(&self.hello);
566        f(&self.create_instance);
567        f(&self.initialization_complete);
568        f(&self.update_configuration);
569        f(&self.create_dataflow);
570        f(&self.schedule);
571        f(&self.allow_compaction);
572        f(&self.peek);
573        f(&self.cancel_peek);
574        f(&self.allow_writes);
575    }
576
577    /// TODO(database-issues#7533): Add documentation.
578    pub fn for_command(&self, command: &ComputeCommand) -> &M {
579        use ComputeCommand::*;
580
581        match command {
582            Hello { .. } => &self.hello,
583            CreateInstance(_) => &self.create_instance,
584            InitializationComplete => &self.initialization_complete,
585            UpdateConfiguration(_) => &self.update_configuration,
586            CreateDataflow(_) => &self.create_dataflow,
587            Schedule(_) => &self.schedule,
588            AllowCompaction { .. } => &self.allow_compaction,
589            Peek(_) => &self.peek,
590            CancelPeek { .. } => &self.cancel_peek,
591            AllowWrites { .. } => &self.allow_writes,
592        }
593    }
594}
595
596/// Metrics keyed by `ComputeResponse` type.
597#[derive(Debug)]
598struct ResponseMetrics<M> {
599    frontiers: M,
600    peek_response: M,
601    subscribe_response: M,
602    copy_to_response: M,
603    status: M,
604}
605
606impl<M> ResponseMetrics<M> {
607    fn build<F>(build_metric: F) -> Self
608    where
609        F: Fn(&str) -> M,
610    {
611        Self {
612            frontiers: build_metric("frontiers"),
613            peek_response: build_metric("peek_response"),
614            subscribe_response: build_metric("subscribe_response"),
615            copy_to_response: build_metric("copy_to_response"),
616            status: build_metric("status"),
617        }
618    }
619
620    fn for_response(&self, response: &ComputeResponse) -> &M {
621        use ComputeResponse::*;
622
623        match response {
624            Frontiers(..) => &self.frontiers,
625            PeekResponse(..) => &self.peek_response,
626            SubscribeResponse(..) => &self.subscribe_response,
627            CopyToResponse(..) => &self.copy_to_response,
628            Status(..) => &self.status,
629        }
630    }
631}
632
633/// Metrics tracked by the command history.
634#[derive(Debug)]
635pub struct HistoryMetrics<G> {
636    /// Metrics tracking command counts.
637    pub command_counts: CommandMetrics<G>,
638    /// Metric tracking the dataflow count.
639    pub dataflow_count: G,
640}
641
642impl<G> HistoryMetrics<G>
643where
644    G: Borrow<mz_ore::metrics::UIntGauge>,
645{
646    /// Reset all tracked counts to 0.
647    pub fn reset(&self) {
648        self.command_counts.for_all(|m| m.borrow().set(0));
649        self.dataflow_count.borrow().set(0);
650    }
651}
652
653/// Metrics for finished peeks, keyed by peek result.
654#[derive(Debug)]
655pub struct PeekMetrics<M> {
656    rows: M,
657    rows_stashed: M,
658    error: M,
659    canceled: M,
660}
661
662impl<M> PeekMetrics<M> {
663    fn build<F>(build_metric: F) -> Self
664    where
665        F: Fn(&str) -> M,
666    {
667        Self {
668            rows: build_metric("rows"),
669            rows_stashed: build_metric("rows_stashed"),
670            error: build_metric("error"),
671            canceled: build_metric("canceled"),
672        }
673    }
674
675    fn for_peek_response(&self, response: &PeekResponse) -> &M {
676        use PeekResponse::*;
677
678        match response {
679            Rows(_) => &self.rows,
680            Stashed(_) => &self.rows_stashed,
681            Error(_) => &self.error,
682            Canceled => &self.canceled,
683        }
684    }
685}