1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Telemetry collection.
//!
//! This report loop collects two types of telemetry data on a regular interval:
//!
//! * Statistics, which represent aggregated activity since the last reporting
//! interval. An example of a statistic is "number of SUBSCRIBE queries
//! executed in the last reporting interval."
//!
//! * Traits, which represent the properties of the environment at the time of
//! reporting. An example of a trait is "number of currently active
//! SUBSCRIBE queries."
//!
//! The reporting loop makes two Segment API calls each interval:
//!
//! * A `group` API call [0] to report traits. The traits are scoped to the
//! environment's cloud provider and region, as in:
//!
//! ```json
//! {
//! "aws": {
//! "us-east-1": {
//! "active_subscribes": 2,
//! ...
//! }
//! }
//! }
//! ```
//!
//! Downstream tools often flatten these traits into, e.g.,
//! `aws_us_east_1_active_subscribes`.
//!
//! * A `track` API call [1] for the "Environment Rolled Up" event, containing
//! both statistics and traits as the event properties, as in:
//!
//! ```json
//! {
//! "cloud_provider": "aws",
//! "cloud_provider_region": "us-east-1",
//! "active_subscribes": 1,
//! "subscribes": 23,
//! ...
//! }
//! ```
//!
//! This event is only emitted after the *first* reporting interval has
//! completed, since at boot all statistics will be zero.
//!
//! The reason for including traits in both the `group` and `track` API calls is
//! because downstream tools want easy access to both of these questions:
//!
//! 1. What is the latest state of the environment?
//! 2. What was the state of this environment in this time window?
//!
//! Answering question 2 requires that we periodically report statistics and
//! traits in a `track` call. Strictly speaking, the `track` event could be used
//! to answer question 1 too (look for the latest "Environment Rolled Up"
//! event), but in practice it is often far more convenient to have the latest
//! state available as a property of the environment.
//!
//! [0]: https://segment.com/docs/connections/spec/group/
//! [1]: https://segment.com/docs/connections/spec/track/
// To test this module, you'll need to run environmentd with
// the --segment-api-key=<REDACTED> flag. Use the API key from your personal
// Materialize Cloud stack.
//
// You can then use the Segment debugger to watch the events emitted by your
// environment in real time:
// https://app.segment.com/materializeinc/sources/cloud_dev/debugger.
use mz_adapter::telemetry::{EventDetails, SegmentClientExt};
use mz_ore::retry::Retry;
use mz_ore::{assert_none, task};
use mz_repr::adt::jsonb::Jsonb;
use mz_sql::catalog::EnvironmentId;
use serde_json::json;
use tokio::time::{self, Duration};
use tracing::warn;
/// How frequently to send a summary to Segment.
const REPORT_INTERVAL: Duration = Duration::from_secs(3600);
/// Telemetry configuration.
#[derive(Clone)]
pub struct Config {
/// The Segment client to report telemetry events to.
pub segment_client: mz_segment::Client,
/// A client to the adapter to introspect.
pub adapter_client: mz_adapter::Client,
/// The ID of the environment for which to report data.
pub environment_id: EnvironmentId,
}
/// Starts reporting telemetry events to Segment.
pub fn start_reporting(config: Config) {
task::spawn(|| "telemetry", report_loop(config));
}
async fn report_loop(
Config {
segment_client,
adapter_client,
environment_id,
}: Config,
) {
struct Stats {
deletes: u64,
inserts: u64,
selects: u64,
subscribes: u64,
updates: u64,
}
let mut last_stats: Option<Stats> = None;
let mut interval = time::interval(REPORT_INTERVAL);
loop {
interval.tick().await;
let traits = Retry::default()
.initial_backoff(Duration::from_secs(1))
.max_tries(5)
.retry_async(|_state| async {
let active_subscribes = adapter_client
.metrics()
.active_subscribes
.with_label_values(&["user"])
.get();
let mut rows = adapter_client.support_execute_one(&format!("
SELECT jsonb_build_object(
'active_aws_privatelink_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'aws-privatelink')::int4,
'active_clusters', (SELECT count(*) FROM mz_clusters WHERE id LIKE 'u%')::int4,
'active_cluster_replicas', (
SELECT jsonb_object_agg(base.size, coalesce(count, 0))
FROM mz_catalog.mz_cluster_replica_sizes base
LEFT JOIN (
SELECT r.size, count(*)::int4
FROM mz_cluster_replicas r
JOIN mz_clusters c ON c.id = r.cluster_id
WHERE c.id LIKE 'u%'
GROUP BY r.size
) extant ON base.size = extant.size
),
'active_confluent_schema_registry_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'confluent-schema-registry')::int4,
'active_materialized_views', (SELECT count(*) FROM mz_materialized_views WHERE id LIKE 'u%')::int4,
'active_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type <> 'subsource')::int4,
'active_kafka_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'kafka')::int4,
'active_kafka_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'kafka')::int4,
'active_load_generator_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'load-generator')::int4,
'active_postgres_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'postgres')::int4,
'active_postgres_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'postgres')::int4,
'active_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%')::int4,
'active_ssh_tunnel_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'ssh-tunnel')::int4,
'active_kafka_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%' AND type = 'kafka')::int4,
'active_tables', (SELECT count(*) FROM mz_tables WHERE id LIKE 'u%')::int4,
'active_views', (SELECT count(*) FROM mz_views WHERE id LIKE 'u%')::int4,
'active_subscribes', {active_subscribes}
)",
)).await?;
let row = rows.next().expect("expected at least one row").to_owned();
assert_none!(rows.next(), "introspection query had more than one row?");
let jsonb = Jsonb::from_row(row);
Ok::<_, anyhow::Error>(jsonb.as_ref().to_serde_json())
})
.await;
let traits = match traits {
Ok(traits) => traits,
Err(e) => {
warn!("unable to collect telemetry traits: {e}");
continue;
}
};
segment_client.group(
// We use the organization ID as the user ID for events
// that are not associated with a particular user.
environment_id.organization_id(),
environment_id.organization_id(),
json!({
environment_id.cloud_provider().to_string(): {
environment_id.cloud_provider_region(): traits,
}
}),
);
let query_total = &adapter_client.metrics().query_total;
let current_stats = Stats {
deletes: query_total.with_label_values(&["user", "delete"]).get(),
inserts: query_total.with_label_values(&["user", "insert"]).get(),
updates: query_total.with_label_values(&["user", "update"]).get(),
selects: query_total.with_label_values(&["user", "select"]).get(),
subscribes: query_total.with_label_values(&["user", "subscribe"]).get(),
};
if let Some(last_stats) = &last_stats {
let mut properties = json!({
"deletes": current_stats.deletes - last_stats.deletes,
"inserts": current_stats.inserts - last_stats.inserts,
"updates": current_stats.updates - last_stats.updates,
"selects": current_stats.selects - last_stats.selects,
"subscribes": current_stats.subscribes - last_stats.subscribes,
});
properties
.as_object_mut()
.unwrap()
.extend(traits.as_object().unwrap().clone());
segment_client.environment_track(
&environment_id,
"Environment Rolled Up",
properties,
EventDetails::default(),
);
}
last_stats = Some(current_stats);
}
}