Skip to main content

mz_environmentd/
telemetry.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Telemetry collection.
11//!
12//! This report loop collects two types of telemetry data on a regular interval:
13//!
14//!   * Statistics, which represent aggregated activity since the last reporting
15//!     interval. An example of a statistic is "number of SUBSCRIBE queries
16//!     executed in the last reporting interval."
17//!
18//!   * Traits, which represent the properties of the environment at the time of
19//!     reporting. An example of a trait is "number of currently active
20//!     SUBSCRIBE queries."
21//!
22//! The reporting loop makes two Segment API calls each interval:
23//!
24//!   * A `group` API call [0] to report traits. The traits are scoped to the
25//!     environment's cloud provider and region, as in:
26//!
27//!     ```json
28//!     {
29//!         "aws": {
30//!             "us-east-1": {
31//!                 "active_subscribes": 2,
32//!                 ...
33//!             }
34//!         }
35//!     }
36//!     ```
37//!
38//!     Downstream tools often flatten these traits into, e.g.,
39//!     `aws_us_east_1_active_subscribes`.
40//!
41//!  * A `track` API call [1] for the "Environment Rolled Up" event, containing
42//!    both statistics and traits as the event properties, as in:
43//!
44//!    ```json
45//!    {
46//!        "cloud_provider": "aws",
47//!        "cloud_provider_region": "us-east-1",
48//!        "active_subscribes": 1,
49//!        "subscribes": 23,
50//!        ...
51//!    }
52//!    ```
53//!
54//!    This event is only emitted after the *first* reporting interval has
55//!    completed, since at boot all statistics will be zero.
56//!
57//! The reason for including traits in both the `group` and `track` API calls is
58//! because downstream tools want easy access to both of these questions:
59//!
60//!   1. What is the latest state of the environment?
61//!   2. What was the state of this environment in this time window?
62//!
63//! Answering question 2 requires that we periodically report statistics and
64//! traits in a `track` call. Strictly speaking, the `track` event could be used
65//! to answer question 1 too (look for the latest "Environment Rolled Up"
66//! event), but in practice it is often far more convenient to have the latest
67//! state available as a property of the environment.
68//!
69//! [0]: https://segment.com/docs/connections/spec/group/
70//! [1]: https://segment.com/docs/connections/spec/track/
71
72// To test this module, you'll need to run environmentd with
73// the --segment-api-key=<REDACTED> flag. Use the API key from your personal
74// Materialize Cloud stack.
75//
76// You can then use the Segment debugger to watch the events emitted by your
77// environment in real time:
78// https://app.segment.com/materializeinc/sources/cloud_dev/debugger.
79
80use anyhow::bail;
81use futures::StreamExt;
82use mz_adapter::PeekResponseUnary;
83use mz_adapter::telemetry::{EventDetails, SegmentClientExt};
84use mz_ore::collections::CollectionExt;
85use mz_ore::retry::Retry;
86use mz_ore::{soft_panic_or_log, task};
87use mz_repr::adt::jsonb::Jsonb;
88use mz_sql::catalog::EnvironmentId;
89use serde_json::json;
90use tokio::time::{self, Duration};
91
92/// Telemetry configuration.
93#[derive(Clone)]
94pub struct Config {
95    /// The Segment client to report telemetry events to.
96    pub segment_client: mz_segment::Client,
97    /// A client to the adapter to introspect.
98    pub adapter_client: mz_adapter::Client,
99    /// The ID of the environment for which to report data.
100    pub environment_id: EnvironmentId,
101    /// How frequently to send a summary to Segment.
102    pub report_interval: Duration,
103}
104
105/// Starts reporting telemetry events to Segment.
106pub fn start_reporting(config: Config) {
107    task::spawn(|| "telemetry", report_loop(config));
108}
109
110async fn report_loop(
111    Config {
112        segment_client,
113        adapter_client,
114        environment_id,
115        report_interval,
116    }: Config,
117) {
118    struct Stats {
119        deletes: u64,
120        inserts: u64,
121        selects: u64,
122        subscribes: u64,
123        updates: u64,
124    }
125
126    let mut last_stats: Option<Stats> = None;
127
128    let mut interval = time::interval(report_interval);
129    loop {
130        interval.tick().await;
131
132        let traits = Retry::default()
133            .initial_backoff(Duration::from_secs(1))
134            .max_tries(5)
135            .retry_async(|_state| async {
136                let active_subscribes = adapter_client
137                    .metrics()
138                    .active_subscribes
139                    .with_label_values(&["user"])
140                    .get();
141                let mut rows_stream = adapter_client.support_execute_one(&format!("
142                    SELECT jsonb_build_object(
143                        'active_aws_privatelink_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'aws-privatelink')::int4,
144                        'active_clusters', (SELECT count(*) FROM mz_clusters WHERE id LIKE 'u%')::int4,
145                        'active_cluster_replicas', (
146                            SELECT jsonb_object_agg(base.size, coalesce(count, 0))
147                            FROM mz_catalog.mz_cluster_replica_sizes base
148                            LEFT JOIN (
149                                SELECT r.size, count(*)::int4
150                                FROM mz_cluster_replicas r
151                                JOIN mz_clusters c ON c.id = r.cluster_id
152                                WHERE c.id LIKE 'u%'
153                                GROUP BY r.size
154                            ) extant ON base.size = extant.size
155                        ),
156                        'active_confluent_schema_registry_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'confluent-schema-registry')::int4,
157                        'active_materialized_views', (SELECT count(*) FROM mz_materialized_views WHERE id LIKE 'u%')::int4,
158                        'active_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type <> 'subsource')::int4,
159                        'active_kafka_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'kafka')::int4,
160                        'active_kafka_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'kafka')::int4,
161                        'active_load_generator_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'load-generator')::int4,
162                        'active_postgres_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'postgres')::int4,
163                        'active_postgres_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'postgres')::int4,
164                        'active_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%')::int4,
165                        'active_ssh_tunnel_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'ssh-tunnel')::int4,
166                        'active_kafka_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%' AND type = 'kafka')::int4,
167                        'active_tables', (SELECT count(*) FROM mz_tables WHERE id LIKE 'u%')::int4,
168                        'active_views', (SELECT count(*) FROM mz_views WHERE id LIKE 'u%')::int4,
169                        'active_subscribes', {active_subscribes}
170                    )",
171                )).await?;
172
173                let mut row_iters = Vec::new();
174
175                while let Some(rows) = rows_stream.next().await {
176                    match rows {
177                        PeekResponseUnary::Rows(rows) => row_iters.push(rows),
178                        PeekResponseUnary::Canceled => bail!("query canceled"),
179                        PeekResponseUnary::Error(e) => bail!(e),
180                        PeekResponseUnary::DependencyDropped(dep) => {
181                            bail!("{}", dep.query_terminated_error())
182                        }
183                    }
184                }
185
186                let mut rows = Vec::new();
187                for mut row_iter in row_iters {
188                    while let Some(row) = row_iter.next() {
189                        rows.push(row.to_owned());
190                    }
191                }
192
193                assert_eq!(1, rows.len(), "expected one row but got: {:?}", rows);
194                let row = rows.into_first();
195
196                let jsonb = Jsonb::from_row(row);
197                Ok::<_, anyhow::Error>(jsonb.as_ref().to_serde_json())
198            })
199            .await;
200
201        let traits = match traits {
202            Ok(traits) => traits,
203            Err(e) => {
204                soft_panic_or_log!("unable to collect telemetry traits: {e}");
205                continue;
206            }
207        };
208
209        tracing::info!(?traits, "telemetry traits");
210
211        segment_client.group(
212            // We use the organization ID as the user ID for events
213            // that are not associated with a particular user.
214            environment_id.organization_id(),
215            environment_id.organization_id(),
216            json!({
217                environment_id.cloud_provider().to_string(): {
218                    environment_id.cloud_provider_region(): traits,
219                }
220            }),
221        );
222
223        let query_total = &adapter_client.metrics().query_total;
224        let current_stats = Stats {
225            deletes: query_total.with_label_values(&["user", "delete"]).get(),
226            inserts: query_total.with_label_values(&["user", "insert"]).get(),
227            updates: query_total.with_label_values(&["user", "update"]).get(),
228            selects: query_total.with_label_values(&["user", "select"]).get(),
229            subscribes: query_total.with_label_values(&["user", "subscribe"]).get(),
230        };
231        if let Some(last_stats) = &last_stats {
232            let mut properties = json!({
233                "deletes": current_stats.deletes - last_stats.deletes,
234                "inserts": current_stats.inserts - last_stats.inserts,
235                "updates": current_stats.updates - last_stats.updates,
236                "selects": current_stats.selects - last_stats.selects,
237                "subscribes": current_stats.subscribes - last_stats.subscribes,
238            });
239            properties
240                .as_object_mut()
241                .unwrap()
242                .extend(traits.as_object().unwrap().clone());
243            segment_client.environment_track(
244                &environment_id,
245                "Environment Rolled Up",
246                properties,
247                EventDetails::default(),
248            );
249        }
250        last_stats = Some(current_stats);
251    }
252}