1use anyhow::bail;
81use futures::StreamExt;
82use mz_adapter::PeekResponseUnary;
83use mz_adapter::telemetry::{EventDetails, SegmentClientExt};
84use mz_ore::collections::CollectionExt;
85use mz_ore::retry::Retry;
86use mz_ore::{soft_panic_or_log, task};
87use mz_repr::adt::jsonb::Jsonb;
88use mz_sql::catalog::EnvironmentId;
89use serde_json::json;
90use tokio::time::{self, Duration};
91
92#[derive(Clone)]
94pub struct Config {
95 pub segment_client: mz_segment::Client,
97 pub adapter_client: mz_adapter::Client,
99 pub environment_id: EnvironmentId,
101 pub report_interval: Duration,
103}
104
105pub fn start_reporting(config: Config) {
107 task::spawn(|| "telemetry", report_loop(config));
108}
109
110async fn report_loop(
111 Config {
112 segment_client,
113 adapter_client,
114 environment_id,
115 report_interval,
116 }: Config,
117) {
118 struct Stats {
119 deletes: u64,
120 inserts: u64,
121 selects: u64,
122 subscribes: u64,
123 updates: u64,
124 }
125
126 let mut last_stats: Option<Stats> = None;
127
128 let mut interval = time::interval(report_interval);
129 loop {
130 interval.tick().await;
131
132 let traits = Retry::default()
133 .initial_backoff(Duration::from_secs(1))
134 .max_tries(5)
135 .retry_async(|_state| async {
136 let active_subscribes = adapter_client
137 .metrics()
138 .active_subscribes
139 .with_label_values(&["user"])
140 .get();
141 let mut rows_stream = adapter_client.support_execute_one(&format!("
142 SELECT jsonb_build_object(
143 'active_aws_privatelink_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'aws-privatelink')::int4,
144 'active_clusters', (SELECT count(*) FROM mz_clusters WHERE id LIKE 'u%')::int4,
145 'active_cluster_replicas', (
146 SELECT jsonb_object_agg(base.size, coalesce(count, 0))
147 FROM mz_catalog.mz_cluster_replica_sizes base
148 LEFT JOIN (
149 SELECT r.size, count(*)::int4
150 FROM mz_cluster_replicas r
151 JOIN mz_clusters c ON c.id = r.cluster_id
152 WHERE c.id LIKE 'u%'
153 GROUP BY r.size
154 ) extant ON base.size = extant.size
155 ),
156 'active_confluent_schema_registry_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'confluent-schema-registry')::int4,
157 'active_materialized_views', (SELECT count(*) FROM mz_materialized_views WHERE id LIKE 'u%')::int4,
158 'active_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type <> 'subsource')::int4,
159 'active_kafka_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'kafka')::int4,
160 'active_kafka_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'kafka')::int4,
161 'active_load_generator_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'load-generator')::int4,
162 'active_postgres_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'postgres')::int4,
163 'active_postgres_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'postgres')::int4,
164 'active_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%')::int4,
165 'active_ssh_tunnel_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'ssh-tunnel')::int4,
166 'active_kafka_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%' AND type = 'kafka')::int4,
167 'active_tables', (SELECT count(*) FROM mz_tables WHERE id LIKE 'u%')::int4,
168 'active_views', (SELECT count(*) FROM mz_views WHERE id LIKE 'u%')::int4,
169 'active_subscribes', {active_subscribes}
170 )",
171 )).await?;
172
173 let mut row_iters = Vec::new();
174
175 while let Some(rows) = rows_stream.next().await {
176 match rows {
177 PeekResponseUnary::Rows(rows) => row_iters.push(rows),
178 PeekResponseUnary::Canceled => bail!("query canceled"),
179 PeekResponseUnary::Error(e) => bail!(e),
180 }
181 }
182
183 let mut rows = Vec::new();
184 for mut row_iter in row_iters {
185 while let Some(row) = row_iter.next() {
186 rows.push(row.to_owned());
187 }
188 }
189
190 assert_eq!(1, rows.len(), "expected one row but got: {:?}", rows);
191 let row = rows.into_first();
192
193 let jsonb = Jsonb::from_row(row);
194 Ok::<_, anyhow::Error>(jsonb.as_ref().to_serde_json())
195 })
196 .await;
197
198 let traits = match traits {
199 Ok(traits) => traits,
200 Err(e) => {
201 soft_panic_or_log!("unable to collect telemetry traits: {e}");
202 continue;
203 }
204 };
205
206 tracing::info!(?traits, "telemetry traits");
207
208 segment_client.group(
209 environment_id.organization_id(),
212 environment_id.organization_id(),
213 json!({
214 environment_id.cloud_provider().to_string(): {
215 environment_id.cloud_provider_region(): traits,
216 }
217 }),
218 );
219
220 let query_total = &adapter_client.metrics().query_total;
221 let current_stats = Stats {
222 deletes: query_total.with_label_values(&["user", "delete"]).get(),
223 inserts: query_total.with_label_values(&["user", "insert"]).get(),
224 updates: query_total.with_label_values(&["user", "update"]).get(),
225 selects: query_total.with_label_values(&["user", "select"]).get(),
226 subscribes: query_total.with_label_values(&["user", "subscribe"]).get(),
227 };
228 if let Some(last_stats) = &last_stats {
229 let mut properties = json!({
230 "deletes": current_stats.deletes - last_stats.deletes,
231 "inserts": current_stats.inserts - last_stats.inserts,
232 "updates": current_stats.updates - last_stats.updates,
233 "selects": current_stats.selects - last_stats.selects,
234 "subscribes": current_stats.subscribes - last_stats.subscribes,
235 });
236 properties
237 .as_object_mut()
238 .unwrap()
239 .extend(traits.as_object().unwrap().clone());
240 segment_client.environment_track(
241 &environment_id,
242 "Environment Rolled Up",
243 properties,
244 EventDetails::default(),
245 );
246 }
247 last_stats = Some(current_stats);
248 }
249}