1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Telemetry collection.
11//!
12//! This report loop collects two types of telemetry data on a regular interval:
13//!
14//! * Statistics, which represent aggregated activity since the last reporting
15//! interval. An example of a statistic is "number of SUBSCRIBE queries
16//! executed in the last reporting interval."
17//!
18//! * Traits, which represent the properties of the environment at the time of
19//! reporting. An example of a trait is "number of currently active
20//! SUBSCRIBE queries."
21//!
22//! The reporting loop makes two Segment API calls each interval:
23//!
24//! * A `group` API call [0] to report traits. The traits are scoped to the
25//! environment's cloud provider and region, as in:
26//!
27//! ```json
28//! {
29//! "aws": {
30//! "us-east-1": {
31//! "active_subscribes": 2,
32//! ...
33//! }
34//! }
35//! }
36//! ```
37//!
38//! Downstream tools often flatten these traits into, e.g.,
39//! `aws_us_east_1_active_subscribes`.
40//!
41//! * A `track` API call [1] for the "Environment Rolled Up" event, containing
42//! both statistics and traits as the event properties, as in:
43//!
44//! ```json
45//! {
46//! "cloud_provider": "aws",
47//! "cloud_provider_region": "us-east-1",
48//! "active_subscribes": 1,
49//! "subscribes": 23,
50//! ...
51//! }
52//! ```
53//!
54//! This event is only emitted after the *first* reporting interval has
55//! completed, since at boot all statistics will be zero.
56//!
57//! The reason for including traits in both the `group` and `track` API calls is
58//! because downstream tools want easy access to both of these questions:
59//!
60//! 1. What is the latest state of the environment?
61//! 2. What was the state of this environment in this time window?
62//!
63//! Answering question 2 requires that we periodically report statistics and
64//! traits in a `track` call. Strictly speaking, the `track` event could be used
65//! to answer question 1 too (look for the latest "Environment Rolled Up"
66//! event), but in practice it is often far more convenient to have the latest
67//! state available as a property of the environment.
68//!
69//! [0]: https://segment.com/docs/connections/spec/group/
70//! [1]: https://segment.com/docs/connections/spec/track/
7172// To test this module, you'll need to run environmentd with
73// the --segment-api-key=<REDACTED> flag. Use the API key from your personal
74// Materialize Cloud stack.
75//
76// You can then use the Segment debugger to watch the events emitted by your
77// environment in real time:
78// https://app.segment.com/materializeinc/sources/cloud_dev/debugger.
7980use mz_adapter::telemetry::{EventDetails, SegmentClientExt};
81use mz_ore::retry::Retry;
82use mz_ore::{assert_none, task};
83use mz_repr::adt::jsonb::Jsonb;
84use mz_sql::catalog::EnvironmentId;
85use serde_json::json;
86use tokio::time::{self, Duration};
87use tracing::warn;
8889/// How frequently to send a summary to Segment.
90const REPORT_INTERVAL: Duration = Duration::from_secs(3600);
9192/// Telemetry configuration.
93#[derive(Clone)]
94pub struct Config {
95/// The Segment client to report telemetry events to.
96pub segment_client: mz_segment::Client,
97/// A client to the adapter to introspect.
98pub adapter_client: mz_adapter::Client,
99/// The ID of the environment for which to report data.
100pub environment_id: EnvironmentId,
101}
102103/// Starts reporting telemetry events to Segment.
104pub fn start_reporting(config: Config) {
105 task::spawn(|| "telemetry", report_loop(config));
106}
107108async fn report_loop(
109 Config {
110 segment_client,
111 adapter_client,
112 environment_id,
113 }: Config,
114) {
115struct Stats {
116 deletes: u64,
117 inserts: u64,
118 selects: u64,
119 subscribes: u64,
120 updates: u64,
121 }
122123let mut last_stats: Option<Stats> = None;
124125let mut interval = time::interval(REPORT_INTERVAL);
126loop {
127 interval.tick().await;
128129let traits = Retry::default()
130 .initial_backoff(Duration::from_secs(1))
131 .max_tries(5)
132 .retry_async(|_state| async {
133let active_subscribes = adapter_client
134 .metrics()
135 .active_subscribes
136 .with_label_values(&["user"])
137 .get();
138let mut rows = adapter_client.support_execute_one(&format!("
139 SELECT jsonb_build_object(
140 'active_aws_privatelink_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'aws-privatelink')::int4,
141 'active_clusters', (SELECT count(*) FROM mz_clusters WHERE id LIKE 'u%')::int4,
142 'active_cluster_replicas', (
143 SELECT jsonb_object_agg(base.size, coalesce(count, 0))
144 FROM mz_catalog.mz_cluster_replica_sizes base
145 LEFT JOIN (
146 SELECT r.size, count(*)::int4
147 FROM mz_cluster_replicas r
148 JOIN mz_clusters c ON c.id = r.cluster_id
149 WHERE c.id LIKE 'u%'
150 GROUP BY r.size
151 ) extant ON base.size = extant.size
152 ),
153 'active_confluent_schema_registry_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'confluent-schema-registry')::int4,
154 'active_materialized_views', (SELECT count(*) FROM mz_materialized_views WHERE id LIKE 'u%')::int4,
155 'active_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type <> 'subsource')::int4,
156 'active_kafka_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'kafka')::int4,
157 'active_kafka_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'kafka')::int4,
158 'active_load_generator_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'load-generator')::int4,
159 'active_postgres_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'postgres')::int4,
160 'active_postgres_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'postgres')::int4,
161 'active_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%')::int4,
162 'active_ssh_tunnel_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'ssh-tunnel')::int4,
163 'active_kafka_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%' AND type = 'kafka')::int4,
164 'active_tables', (SELECT count(*) FROM mz_tables WHERE id LIKE 'u%')::int4,
165 'active_views', (SELECT count(*) FROM mz_views WHERE id LIKE 'u%')::int4,
166 'active_subscribes', {active_subscribes}
167 )",
168 )).await?;
169170let row = rows.next().expect("expected at least one row").to_owned();
171assert_none!(rows.next(), "introspection query had more than one row?");
172173let jsonb = Jsonb::from_row(row);
174Ok::<_, anyhow::Error>(jsonb.as_ref().to_serde_json())
175 })
176 .await;
177178let traits = match traits {
179Ok(traits) => traits,
180Err(e) => {
181warn!("unable to collect telemetry traits: {e}");
182continue;
183 }
184 };
185186 segment_client.group(
187// We use the organization ID as the user ID for events
188 // that are not associated with a particular user.
189environment_id.organization_id(),
190 environment_id.organization_id(),
191json!({
192 environment_id.cloud_provider().to_string(): {
193 environment_id.cloud_provider_region(): traits,
194 }
195 }),
196 );
197198let query_total = &adapter_client.metrics().query_total;
199let current_stats = Stats {
200 deletes: query_total.with_label_values(&["user", "delete"]).get(),
201 inserts: query_total.with_label_values(&["user", "insert"]).get(),
202 updates: query_total.with_label_values(&["user", "update"]).get(),
203 selects: query_total.with_label_values(&["user", "select"]).get(),
204 subscribes: query_total.with_label_values(&["user", "subscribe"]).get(),
205 };
206if let Some(last_stats) = &last_stats {
207let mut properties = json!({
208"deletes": current_stats.deletes - last_stats.deletes,
209"inserts": current_stats.inserts - last_stats.inserts,
210"updates": current_stats.updates - last_stats.updates,
211"selects": current_stats.selects - last_stats.selects,
212"subscribes": current_stats.subscribes - last_stats.subscribes,
213 });
214 properties
215 .as_object_mut()
216 .unwrap()
217 .extend(traits.as_object().unwrap().clone());
218 segment_client.environment_track(
219&environment_id,
220"Environment Rolled Up",
221 properties,
222 EventDetails::default(),
223 );
224 }
225 last_stats = Some(current_stats);
226 }
227}