1use anyhow::bail;
81use futures::StreamExt;
82use mz_adapter::PeekResponseUnary;
83use mz_adapter::telemetry::{EventDetails, SegmentClientExt};
84use mz_ore::collections::CollectionExt;
85use mz_ore::retry::Retry;
86use mz_ore::{soft_panic_or_log, task};
87use mz_repr::adt::jsonb::Jsonb;
88use mz_sql::catalog::EnvironmentId;
89use serde_json::json;
90use tokio::time::{self, Duration};
91
92#[derive(Clone)]
94pub struct Config {
95 pub segment_client: mz_segment::Client,
97 pub adapter_client: mz_adapter::Client,
99 pub environment_id: EnvironmentId,
101 pub report_interval: Duration,
103}
104
105pub fn start_reporting(config: Config) {
107 task::spawn(|| "telemetry", report_loop(config));
108}
109
110async fn report_loop(
111 Config {
112 segment_client,
113 adapter_client,
114 environment_id,
115 report_interval,
116 }: Config,
117) {
118 struct Stats {
119 deletes: u64,
120 inserts: u64,
121 selects: u64,
122 subscribes: u64,
123 updates: u64,
124 }
125
126 let mut last_stats: Option<Stats> = None;
127
128 let mut interval = time::interval(report_interval);
129 loop {
130 interval.tick().await;
131
132 let traits = Retry::default()
133 .initial_backoff(Duration::from_secs(1))
134 .max_tries(5)
135 .retry_async(|_state| async {
136 let active_subscribes = adapter_client
137 .metrics()
138 .active_subscribes
139 .with_label_values(&["user"])
140 .get();
141 let mut rows_stream = adapter_client.support_execute_one(&format!("
142 SELECT jsonb_build_object(
143 'active_aws_privatelink_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'aws-privatelink')::int4,
144 'active_clusters', (SELECT count(*) FROM mz_clusters WHERE id LIKE 'u%')::int4,
145 'active_cluster_replicas', (
146 SELECT jsonb_object_agg(base.size, coalesce(count, 0))
147 FROM mz_catalog.mz_cluster_replica_sizes base
148 LEFT JOIN (
149 SELECT r.size, count(*)::int4
150 FROM mz_cluster_replicas r
151 JOIN mz_clusters c ON c.id = r.cluster_id
152 WHERE c.id LIKE 'u%'
153 GROUP BY r.size
154 ) extant ON base.size = extant.size
155 ),
156 'active_confluent_schema_registry_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'confluent-schema-registry')::int4,
157 'active_materialized_views', (SELECT count(*) FROM mz_materialized_views WHERE id LIKE 'u%')::int4,
158 'active_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type <> 'subsource')::int4,
159 'active_kafka_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'kafka')::int4,
160 'active_kafka_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'kafka')::int4,
161 'active_load_generator_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'load-generator')::int4,
162 'active_postgres_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'postgres')::int4,
163 'active_postgres_sources', (SELECT count(*) FROM mz_sources WHERE id LIKE 'u%' AND type = 'postgres')::int4,
164 'active_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%')::int4,
165 'active_ssh_tunnel_connections', (SELECT count(*) FROM mz_connections WHERE id LIKE 'u%' AND type = 'ssh-tunnel')::int4,
166 'active_kafka_sinks', (SELECT count(*) FROM mz_sinks WHERE id LIKE 'u%' AND type = 'kafka')::int4,
167 'active_tables', (SELECT count(*) FROM mz_tables WHERE id LIKE 'u%')::int4,
168 'active_views', (SELECT count(*) FROM mz_views WHERE id LIKE 'u%')::int4,
169 'active_subscribes', {active_subscribes}
170 )",
171 )).await?;
172
173 let mut row_iters = Vec::new();
174
175 while let Some(rows) = rows_stream.next().await {
176 match rows {
177 PeekResponseUnary::Rows(rows) => row_iters.push(rows),
178 PeekResponseUnary::Canceled => bail!("query canceled"),
179 PeekResponseUnary::Error(e) => bail!(e),
180 PeekResponseUnary::DependencyDropped(dep) => {
181 bail!("{}", dep.query_terminated_error())
182 }
183 }
184 }
185
186 let mut rows = Vec::new();
187 for mut row_iter in row_iters {
188 while let Some(row) = row_iter.next() {
189 rows.push(row.to_owned());
190 }
191 }
192
193 assert_eq!(1, rows.len(), "expected one row but got: {:?}", rows);
194 let row = rows.into_first();
195
196 let jsonb = Jsonb::from_row(row);
197 Ok::<_, anyhow::Error>(jsonb.as_ref().to_serde_json())
198 })
199 .await;
200
201 let traits = match traits {
202 Ok(traits) => traits,
203 Err(e) => {
204 soft_panic_or_log!("unable to collect telemetry traits: {e}");
205 continue;
206 }
207 };
208
209 tracing::info!(?traits, "telemetry traits");
210
211 segment_client.group(
212 environment_id.organization_id(),
215 environment_id.organization_id(),
216 json!({
217 environment_id.cloud_provider().to_string(): {
218 environment_id.cloud_provider_region(): traits,
219 }
220 }),
221 );
222
223 let query_total = &adapter_client.metrics().query_total;
224 let current_stats = Stats {
225 deletes: query_total.with_label_values(&["user", "delete"]).get(),
226 inserts: query_total.with_label_values(&["user", "insert"]).get(),
227 updates: query_total.with_label_values(&["user", "update"]).get(),
228 selects: query_total.with_label_values(&["user", "select"]).get(),
229 subscribes: query_total.with_label_values(&["user", "subscribe"]).get(),
230 };
231 if let Some(last_stats) = &last_stats {
232 let mut properties = json!({
233 "deletes": current_stats.deletes - last_stats.deletes,
234 "inserts": current_stats.inserts - last_stats.inserts,
235 "updates": current_stats.updates - last_stats.updates,
236 "selects": current_stats.selects - last_stats.selects,
237 "subscribes": current_stats.subscribes - last_stats.subscribes,
238 });
239 properties
240 .as_object_mut()
241 .unwrap()
242 .extend(traits.as_object().unwrap().clone());
243 segment_client.environment_track(
244 &environment_id,
245 "Environment Rolled Up",
246 properties,
247 EventDetails::default(),
248 );
249 }
250 last_stats = Some(current_stats);
251 }
252}