mz_environmentd/
telemetry.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Telemetry collection.
11//!
12//! This report loop collects two types of telemetry data on a regular interval:
13//!
14//!   * Statistics, which represent aggregated activity since the last reporting
15//!     interval. An example of a statistic is "number of SUBSCRIBE queries
16//!     executed in the last reporting interval."
17//!
18//!   * Traits, which represent the properties of the environment at the time of
19//!     reporting. An example of a trait is "number of currently active
20//!     SUBSCRIBE queries."
21//!
22//! The reporting loop makes two Segment API calls each interval:
23//!
24//!   * A `group` API call [0] to report traits. The traits are scoped to the
25//!     environment's cloud provider and region, as in:
26//!
27//!     ```json
28//!     {
29//!         "aws": {
30//!             "us-east-1": {
31//!                 "active_subscribes": 2,
32//!                 ...
33//!             }
34//!         }
35//!     }
36//!     ```
37//!
38//!     Downstream tools often flatten these traits into, e.g.,
39//!     `aws_us_east_1_active_subscribes`.
40//!
41//!  * A `track` API call [1] for the "Environment Rolled Up" event, containing
42//!    both statistics and traits as the event properties, as in:
43//!
44//!    ```json
45//!    {
46//!        "cloud_provider": "aws",
47//!        "cloud_provider_region": "us-east-1",
48//!        "active_subscribes": 1,
49//!        "subscribes": 23,
50//!        ...
51//!    }
52//!    ```
53//!
54//!    This event is only emitted after the *first* reporting interval has
55//!    completed, since at boot all statistics will be zero.
56//!
57//! The reason for including traits in both the `group` and `track` API calls is
58//! because downstream tools want easy access to both of these questions:
59//!
60//!   1. What is the latest state of the environment?
61//!   2. What was the state of this environment in this time window?
62//!
63//! Answering question 2 requires that we periodically report statistics and
64//! traits in a `track` call. Strictly speaking, the `track` event could be used
65//! to answer question 1 too (look for the latest "Environment Rolled Up"
66//! event), but in practice it is often far more convenient to have the latest
67//! state available as a property of the environment.
68//!
69//! [0]: https://segment.com/docs/connections/spec/group/
70//! [1]: https://segment.com/docs/connections/spec/track/
71
72// To test this module, you'll need to run environmentd with
73// the --segment-api-key=<REDACTED> flag. Use the API key from your personal
74// Materialize Cloud stack.
75//
76// You can then use the Segment debugger to watch the events emitted by your
77// environment in real time:
78// https://app.segment.com/materializeinc/sources/cloud_dev/debugger.
79
80use anyhow::bail;
81use futures::StreamExt;
82use mz_adapter::PeekResponseUnary;
83use mz_adapter::telemetry::{EventDetails, SegmentClientExt};
84use mz_ore::collections::CollectionExt;
85use mz_ore::retry::Retry;
86use mz_ore::{soft_panic_or_log, task};
87use mz_repr::adt::jsonb::Jsonb;
88use mz_sql::catalog::EnvironmentId;
89use serde_json::json;
90use tokio::time::{self, Duration};
91
92/// Telemetry configuration.
93#[derive(Clone)]
94pub struct Config {
95    /// The Segment client to report telemetry events to.
96    pub segment_client: mz_segment::Client,
97    /// A client to the adapter to introspect.
98    pub adapter_client: mz_adapter::Client,
99    /// The ID of the environment for which to report data.
100    pub environment_id: EnvironmentId,
101    /// How frequently to send a summary to Segment.
102    pub report_interval: Duration,
103}
104
105/// Starts reporting telemetry events to Segment.
106pub fn start_reporting(config: Config) {
107    task::spawn(|| "telemetry", report_loop(config));
108}
109
110async fn report_loop(
111    Config {
112        segment_client,
113        adapter_client,
114        environment_id,
115        report_interval,
116    }: Config,
117) {
118    struct Stats {
119        deletes: u64,
120        inserts: u64,
121        selects: u64,
122        subscribes: u64,
123        updates: u64,
124    }
125
126    let mut last_stats: Option<Stats> = None;
127
128    let mut interval = time::interval(report_interval);
129    loop {
130        interval.tick().await;
131
132        let traits = Retry::default()
133            .initial_backoff(Duration::from_secs(1))
134            .max_tries(5)
135            .retry_async(|_state| async {
136                let active_subscribes = adapter_client
137                    .metrics()
138                    .active_subscribes
139                    .with_label_values(&["user"])
140                    .get();
141                let mut rows_stream = adapter_client.support_execute_one(&format!("
142                    SELECT jsonb_build_object(
143                        'active_aws_privatelink_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'aws-privatelink')::int4,
144                        'active_clusters', (SELECT count(*) FROM mz_clusters WHERE id LIKE 'u%')::int4,
145                        'active_cluster_replicas', (
146                            SELECT jsonb_object_agg(base.size, coalesce(count, 0))
147                            FROM mz_catalog.mz_cluster_replica_sizes base
148                            LEFT JOIN (
149                                SELECT r.size, count(*)::int4
150                                FROM mz_cluster_replicas r
151                                JOIN mz_clusters c ON c.id = r.cluster_id
152                                WHERE c.id LIKE 'u%'
153                                GROUP BY r.size
154                            ) extant ON base.size = extant.size
155                        ),
156                        'active_confluent_schema_registry_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'confluent-schema-registry')::int4,
157                        'active_materialized_views', (SELECT count(*) FROM mz_materialized_views WHERE id LIKE 'u%')::int4,
158                        'active_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type <> 'subsource')::int4,
159                        'active_kafka_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'kafka')::int4,
160                        'active_kafka_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'kafka')::int4,
161                        'active_load_generator_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'load-generator')::int4,
162                        'active_postgres_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'postgres')::int4,
163                        'active_postgres_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'postgres')::int4,
164                        'active_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%')::int4,
165                        'active_ssh_tunnel_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'ssh-tunnel')::int4,
166                        'active_kafka_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%' AND type = 'kafka')::int4,
167                        'active_tables', (SELECT count(*) FROM mz_tables WHERE id LIKE 'u%')::int4,
168                        'active_views', (SELECT count(*) FROM mz_views WHERE id LIKE 'u%')::int4,
169                        'active_subscribes', {active_subscribes}
170                    )",
171                )).await?;
172
173                let mut row_iters = Vec::new();
174
175                while let Some(rows) = rows_stream.next().await {
176                    match rows {
177                        PeekResponseUnary::Rows(rows) => row_iters.push(rows),
178                        PeekResponseUnary::Canceled => bail!("query canceled"),
179                        PeekResponseUnary::Error(e) => bail!(e),
180                    }
181                }
182
183                let mut rows = Vec::new();
184                for mut row_iter in row_iters {
185                    while let Some(row) = row_iter.next() {
186                        rows.push(row.to_owned());
187                    }
188                }
189
190                assert_eq!(1, rows.len(), "expected one row but got: {:?}", rows);
191                let row = rows.into_first();
192
193                let jsonb = Jsonb::from_row(row);
194                Ok::<_, anyhow::Error>(jsonb.as_ref().to_serde_json())
195            })
196            .await;
197
198        let traits = match traits {
199            Ok(traits) => traits,
200            Err(e) => {
201                soft_panic_or_log!("unable to collect telemetry traits: {e}");
202                continue;
203            }
204        };
205
206        tracing::info!(?traits, "telemetry traits");
207
208        segment_client.group(
209            // We use the organization ID as the user ID for events
210            // that are not associated with a particular user.
211            environment_id.organization_id(),
212            environment_id.organization_id(),
213            json!({
214                environment_id.cloud_provider().to_string(): {
215                    environment_id.cloud_provider_region(): traits,
216                }
217            }),
218        );
219
220        let query_total = &adapter_client.metrics().query_total;
221        let current_stats = Stats {
222            deletes: query_total.with_label_values(&["user", "delete"]).get(),
223            inserts: query_total.with_label_values(&["user", "insert"]).get(),
224            updates: query_total.with_label_values(&["user", "update"]).get(),
225            selects: query_total.with_label_values(&["user", "select"]).get(),
226            subscribes: query_total.with_label_values(&["user", "subscribe"]).get(),
227        };
228        if let Some(last_stats) = &last_stats {
229            let mut properties = json!({
230                "deletes": current_stats.deletes - last_stats.deletes,
231                "inserts": current_stats.inserts - last_stats.inserts,
232                "updates": current_stats.updates - last_stats.updates,
233                "selects": current_stats.selects - last_stats.selects,
234                "subscribes": current_stats.subscribes - last_stats.subscribes,
235            });
236            properties
237                .as_object_mut()
238                .unwrap()
239                .extend(traits.as_object().unwrap().clone());
240            segment_client.environment_track(
241                &environment_id,
242                "Environment Rolled Up",
243                properties,
244                EventDetails::default(),
245            );
246        }
247        last_stats = Some(current_stats);
248    }
249}