mz_environmentd/
telemetry.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Telemetry collection.
11//!
12//! This report loop collects two types of telemetry data on a regular interval:
13//!
14//!   * Statistics, which represent aggregated activity since the last reporting
15//!     interval. An example of a statistic is "number of SUBSCRIBE queries
16//!     executed in the last reporting interval."
17//!
18//!   * Traits, which represent the properties of the environment at the time of
19//!     reporting. An example of a trait is "number of currently active
20//!     SUBSCRIBE queries."
21//!
22//! The reporting loop makes two Segment API calls each interval:
23//!
24//!   * A `group` API call [0] to report traits. The traits are scoped to the
25//!     environment's cloud provider and region, as in:
26//!
27//!     ```json
28//!     {
29//!         "aws": {
30//!             "us-east-1": {
31//!                 "active_subscribes": 2,
32//!                 ...
33//!             }
34//!         }
35//!     }
36//!     ```
37//!
38//!     Downstream tools often flatten these traits into, e.g.,
39//!     `aws_us_east_1_active_subscribes`.
40//!
41//!  * A `track` API call [1] for the "Environment Rolled Up" event, containing
42//!    both statistics and traits as the event properties, as in:
43//!
44//!    ```json
45//!    {
46//!        "cloud_provider": "aws",
47//!        "cloud_provider_region": "us-east-1",
48//!        "active_subscribes": 1,
49//!        "subscribes": 23,
50//!        ...
51//!    }
52//!    ```
53//!
54//!    This event is only emitted after the *first* reporting interval has
55//!    completed, since at boot all statistics will be zero.
56//!
57//! The reason for including traits in both the `group` and `track` API calls is
58//! because downstream tools want easy access to both of these questions:
59//!
60//!   1. What is the latest state of the environment?
61//!   2. What was the state of this environment in this time window?
62//!
63//! Answering question 2 requires that we periodically report statistics and
64//! traits in a `track` call. Strictly speaking, the `track` event could be used
65//! to answer question 1 too (look for the latest "Environment Rolled Up"
66//! event), but in practice it is often far more convenient to have the latest
67//! state available as a property of the environment.
68//!
69//! [0]: https://segment.com/docs/connections/spec/group/
70//! [1]: https://segment.com/docs/connections/spec/track/
71
72// To test this module, you'll need to run environmentd with
73// the --segment-api-key=<REDACTED> flag. Use the API key from your personal
74// Materialize Cloud stack.
75//
76// You can then use the Segment debugger to watch the events emitted by your
77// environment in real time:
78// https://app.segment.com/materializeinc/sources/cloud_dev/debugger.
79
80use mz_adapter::telemetry::{EventDetails, SegmentClientExt};
81use mz_ore::retry::Retry;
82use mz_ore::{assert_none, task};
83use mz_repr::adt::jsonb::Jsonb;
84use mz_sql::catalog::EnvironmentId;
85use serde_json::json;
86use tokio::time::{self, Duration};
87use tracing::warn;
88
89/// How frequently to send a summary to Segment.
90const REPORT_INTERVAL: Duration = Duration::from_secs(3600);
91
92/// Telemetry configuration.
93#[derive(Clone)]
94pub struct Config {
95    /// The Segment client to report telemetry events to.
96    pub segment_client: mz_segment::Client,
97    /// A client to the adapter to introspect.
98    pub adapter_client: mz_adapter::Client,
99    /// The ID of the environment for which to report data.
100    pub environment_id: EnvironmentId,
101}
102
103/// Starts reporting telemetry events to Segment.
104pub fn start_reporting(config: Config) {
105    task::spawn(|| "telemetry", report_loop(config));
106}
107
108async fn report_loop(
109    Config {
110        segment_client,
111        adapter_client,
112        environment_id,
113    }: Config,
114) {
115    struct Stats {
116        deletes: u64,
117        inserts: u64,
118        selects: u64,
119        subscribes: u64,
120        updates: u64,
121    }
122
123    let mut last_stats: Option<Stats> = None;
124
125    let mut interval = time::interval(REPORT_INTERVAL);
126    loop {
127        interval.tick().await;
128
129        let traits = Retry::default()
130            .initial_backoff(Duration::from_secs(1))
131            .max_tries(5)
132            .retry_async(|_state| async {
133                let active_subscribes = adapter_client
134                    .metrics()
135                    .active_subscribes
136                    .with_label_values(&["user"])
137                    .get();
138                let mut rows = adapter_client.support_execute_one(&format!("
139                    SELECT jsonb_build_object(
140                        'active_aws_privatelink_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'aws-privatelink')::int4,
141                        'active_clusters', (SELECT count(*) FROM mz_clusters WHERE id LIKE 'u%')::int4,
142                        'active_cluster_replicas', (
143                            SELECT jsonb_object_agg(base.size, coalesce(count, 0))
144                            FROM mz_catalog.mz_cluster_replica_sizes base
145                            LEFT JOIN (
146                                SELECT r.size, count(*)::int4
147                                FROM mz_cluster_replicas r
148                                JOIN mz_clusters c ON c.id = r.cluster_id
149                                WHERE c.id LIKE 'u%'
150                                GROUP BY r.size
151                            ) extant ON base.size = extant.size
152                        ),
153                        'active_confluent_schema_registry_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'confluent-schema-registry')::int4,
154                        'active_materialized_views', (SELECT count(*) FROM mz_materialized_views WHERE id LIKE 'u%')::int4,
155                        'active_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type <> 'subsource')::int4,
156                        'active_kafka_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'kafka')::int4,
157                        'active_kafka_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'kafka')::int4,
158                        'active_load_generator_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'load-generator')::int4,
159                        'active_postgres_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'postgres')::int4,
160                        'active_postgres_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'postgres')::int4,
161                        'active_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%')::int4,
162                        'active_ssh_tunnel_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'ssh-tunnel')::int4,
163                        'active_kafka_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%' AND type = 'kafka')::int4,
164                        'active_tables', (SELECT count(*) FROM mz_tables WHERE id LIKE 'u%')::int4,
165                        'active_views', (SELECT count(*) FROM mz_views WHERE id LIKE 'u%')::int4,
166                        'active_subscribes', {active_subscribes}
167                    )",
168                )).await?;
169
170                let row = rows.next().expect("expected at least one row").to_owned();
171                assert_none!(rows.next(), "introspection query had more than one row?");
172
173                let jsonb = Jsonb::from_row(row);
174                Ok::<_, anyhow::Error>(jsonb.as_ref().to_serde_json())
175            })
176            .await;
177
178        let traits = match traits {
179            Ok(traits) => traits,
180            Err(e) => {
181                warn!("unable to collect telemetry traits: {e}");
182                continue;
183            }
184        };
185
186        segment_client.group(
187            // We use the organization ID as the user ID for events
188            // that are not associated with a particular user.
189            environment_id.organization_id(),
190            environment_id.organization_id(),
191            json!({
192                environment_id.cloud_provider().to_string(): {
193                    environment_id.cloud_provider_region(): traits,
194                }
195            }),
196        );
197
198        let query_total = &adapter_client.metrics().query_total;
199        let current_stats = Stats {
200            deletes: query_total.with_label_values(&["user", "delete"]).get(),
201            inserts: query_total.with_label_values(&["user", "insert"]).get(),
202            updates: query_total.with_label_values(&["user", "update"]).get(),
203            selects: query_total.with_label_values(&["user", "select"]).get(),
204            subscribes: query_total.with_label_values(&["user", "subscribe"]).get(),
205        };
206        if let Some(last_stats) = &last_stats {
207            let mut properties = json!({
208                "deletes": current_stats.deletes - last_stats.deletes,
209                "inserts": current_stats.inserts - last_stats.inserts,
210                "updates": current_stats.updates - last_stats.updates,
211                "selects": current_stats.selects - last_stats.selects,
212                "subscribes": current_stats.subscribes - last_stats.subscribes,
213            });
214            properties
215                .as_object_mut()
216                .unwrap()
217                .extend(traits.as_object().unwrap().clone());
218            segment_client.environment_track(
219                &environment_id,
220                "Environment Rolled Up",
221                properties,
222                EventDetails::default(),
223            );
224        }
225        last_stats = Some(current_stats);
226    }
227}