1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Telemetry collection.
//!
//! This report loop collects two types of telemetry data on a regular interval:
//!
//!   * Statistics, which represent aggregated activity since the last reporting
//!     interval. An example of a statistic is "number of SUBSCRIBE queries
//!     executed in the last reporting interval."
//!
//!   * Traits, which represent the properties of the environment at the time of
//!     reporting. An example of a trait is "number of currently active
//!     SUBSCRIBE queries."
//!
//! The reporting loop makes two Segment API calls each interval:
//!
//!   * A `group` API call [0] to report traits. The traits are scoped to the
//!     environment's cloud provider and region, as in:
//!
//!     ```json
//!     {
//!         "aws": {
//!             "us-east-1": {
//!                 "active_subscribes": 2,
//!                 ...
//!             }
//!         }
//!     }
//!     ```
//!
//!     Downstream tools often flatten these traits into, e.g.,
//!     `aws_us_east_1_active_subscribes`.
//!
//!  * A `track` API call [1] for the "Environment Rolled Up" event, containing
//!    both statistics and traits as the event properties, as in:
//!
//!    ```json
//!    {
//!        "cloud_provider": "aws",
//!        "cloud_provider_region": "us-east-1",
//!        "active_subscribes": 1,
//!        "subscribes": 23,
//!        ...
//!    }
//!    ```
//!
//!    This event is only emitted after the *first* reporting interval has
//!    completed, since at boot all statistics will be zero.
//!
//! The reason for including traits in both the `group` and `track` API calls is
//! because downstream tools want easy access to both of these questions:
//!
//!   1. What is the latest state of the environment?
//!   2. What was the state of this environment in this time window?
//!
//! Answering question 2 requires that we periodically report statistics and
//! traits in a `track` call. Strictly speaking, the `track` event could be used
//! to answer question 1 too (look for the latest "Environment Rolled Up"
//! event), but in practice it is often far more convenient to have the latest
//! state available as a property of the environment.
//!
//! [0]: https://segment.com/docs/connections/spec/group/
//! [1]: https://segment.com/docs/connections/spec/track/

// To test this module, you'll need to run environmentd with
// the --segment-api-key=<REDACTED> flag. Use the API key from your personal
// Materialize Cloud stack.
//
// You can then use the Segment debugger to watch the events emitted by your
// environment in real time:
// https://app.segment.com/materializeinc/sources/cloud_dev/debugger.

use mz_adapter::telemetry::{EventDetails, SegmentClientExt};
use mz_ore::retry::Retry;
use mz_ore::{assert_none, task};
use mz_repr::adt::jsonb::Jsonb;
use mz_sql::catalog::EnvironmentId;
use serde_json::json;
use tokio::time::{self, Duration};
use tracing::warn;

/// How frequently to send a summary to Segment.
const REPORT_INTERVAL: Duration = Duration::from_secs(3600);

/// Telemetry configuration.
#[derive(Clone)]
pub struct Config {
    /// The Segment client to report telemetry events to.
    pub segment_client: mz_segment::Client,
    /// A client to the adapter to introspect.
    pub adapter_client: mz_adapter::Client,
    /// The ID of the environment for which to report data.
    pub environment_id: EnvironmentId,
}

/// Starts reporting telemetry events to Segment.
pub fn start_reporting(config: Config) {
    task::spawn(|| "telemetry", report_loop(config));
}

async fn report_loop(
    Config {
        segment_client,
        adapter_client,
        environment_id,
    }: Config,
) {
    struct Stats {
        deletes: u64,
        inserts: u64,
        selects: u64,
        subscribes: u64,
        updates: u64,
    }

    let mut last_stats: Option<Stats> = None;

    let mut interval = time::interval(REPORT_INTERVAL);
    loop {
        interval.tick().await;

        let traits = Retry::default()
            .initial_backoff(Duration::from_secs(1))
            .max_tries(5)
            .retry_async(|_state| async {
                let active_subscribes = adapter_client
                    .metrics()
                    .active_subscribes
                    .with_label_values(&["user"])
                    .get();
                let mut rows = adapter_client.support_execute_one(&format!("
                    SELECT jsonb_build_object(
                        'active_aws_privatelink_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'aws-privatelink')::int4,
                        'active_clusters', (SELECT count(*) FROM mz_clusters WHERE id LIKE 'u%')::int4,
                        'active_cluster_replicas', (
                            SELECT jsonb_object_agg(base.size, coalesce(count, 0))
                            FROM mz_catalog.mz_cluster_replica_sizes base
                            LEFT JOIN (
                                SELECT r.size, count(*)::int4
                                FROM mz_cluster_replicas r
                                JOIN mz_clusters c ON c.id = r.cluster_id
                                WHERE c.id LIKE 'u%'
                                GROUP BY r.size
                            ) extant ON base.size = extant.size
                        ),
                        'active_confluent_schema_registry_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'confluent-schema-registry')::int4,
                        'active_materialized_views', (SELECT count(*) FROM mz_materialized_views WHERE id LIKE 'u%')::int4,
                        'active_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type <> 'subsource')::int4,
                        'active_kafka_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'kafka')::int4,
                        'active_kafka_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'kafka')::int4,
                        'active_load_generator_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'load-generator')::int4,
                        'active_postgres_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'postgres')::int4,
                        'active_postgres_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'postgres')::int4,
                        'active_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%')::int4,
                        'active_ssh_tunnel_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'ssh-tunnel')::int4,
                        'active_kafka_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%' AND type = 'kafka')::int4,
                        'active_tables', (SELECT count(*) FROM mz_tables WHERE id LIKE 'u%')::int4,
                        'active_views', (SELECT count(*) FROM mz_views WHERE id LIKE 'u%')::int4,
                        'active_subscribes', {active_subscribes}
                    )",
                )).await?;

                let row = rows.next().expect("expected at least one row").to_owned();
                assert_none!(rows.next(), "introspection query had more than one row?");

                let jsonb = Jsonb::from_row(row);
                Ok::<_, anyhow::Error>(jsonb.as_ref().to_serde_json())
            })
            .await;

        let traits = match traits {
            Ok(traits) => traits,
            Err(e) => {
                warn!("unable to collect telemetry traits: {e}");
                continue;
            }
        };

        segment_client.group(
            // We use the organization ID as the user ID for events
            // that are not associated with a particular user.
            environment_id.organization_id(),
            environment_id.organization_id(),
            json!({
                environment_id.cloud_provider().to_string(): {
                    environment_id.cloud_provider_region(): traits,
                }
            }),
        );

        let query_total = &adapter_client.metrics().query_total;
        let current_stats = Stats {
            deletes: query_total.with_label_values(&["user", "delete"]).get(),
            inserts: query_total.with_label_values(&["user", "insert"]).get(),
            updates: query_total.with_label_values(&["user", "update"]).get(),
            selects: query_total.with_label_values(&["user", "select"]).get(),
            subscribes: query_total.with_label_values(&["user", "subscribe"]).get(),
        };
        if let Some(last_stats) = &last_stats {
            let mut properties = json!({
                "deletes": current_stats.deletes - last_stats.deletes,
                "inserts": current_stats.inserts - last_stats.inserts,
                "updates": current_stats.updates - last_stats.updates,
                "selects": current_stats.selects - last_stats.selects,
                "subscribes": current_stats.subscribes - last_stats.subscribes,
            });
            properties
                .as_object_mut()
                .unwrap()
                .extend(traits.as_object().unwrap().clone());
            segment_client.environment_track(
                &environment_id,
                "Environment Rolled Up",
                properties,
                EventDetails::default(),
            );
        }
        last_stats = Some(current_stats);
    }
}