1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use serde::Deserialize;
use tokio::time::{self, Duration};
use tracing::{debug, warn};
use uuid::Uuid;
use mz_ore::retry::Retry;
use crate::BUILD_INFO;
pub struct Config {
pub domain: String,
pub interval: Duration,
pub cluster_id: Uuid,
pub workers: usize,
pub coord_client: mz_coord::Client,
}
pub async fn report_loop(config: Config) {
let mut interval = time::interval(config.interval);
let mut reported_version = BUILD_INFO.semver_version();
loop {
interval.tick().await;
let latest_version = match report_one(&config).await {
Ok(latest_version) => latest_version,
Err(e) => {
debug!("failed to report telemetry: {}", e);
continue;
}
};
if latest_version > reported_version {
match BUILD_INFO.semver_version().pre.as_str() {
"dev" => {
debug!(
"a new version of materialized is available: {}",
latest_version
);
}
_ => {
warn!(
"a new version of materialized is available: {}",
latest_version
);
}
};
reported_version = latest_version;
}
}
}
fn make_telemetry_query(config: &Config) -> String {
let architecture = std::env::consts::ARCH;
let os = std::env::consts::OS;
format!("
SELECT jsonb_build_object(
'version', mz_version(),
'status', jsonb_build_object(
'session_id', mz_internal.mz_session_id(),
'uptime_seconds', extract(epoch FROM mz_uptime()),
'num_workers', {workers},
'architecture', '{architecture}',
'os', '{os}',
'sources', (
SELECT jsonb_object_agg(connector_type, jsonb_build_object('count', count))
FROM (SELECT connector_type, count(*) FROM mz_sources WHERE id LIKE 'u%' GROUP BY connector_type)
),
'tables', jsonb_build_object('count', (SELECT count(*) FROM mz_tables WHERE id LIKE 'u%')),
'views', jsonb_build_object('count', (SELECT count(*) FROM mz_views WHERE id LIKE 'u%')),
'sinks', (
SELECT jsonb_object_agg(connector_type, jsonb_build_object('count', count))
FROM (SELECT connector_type, count(*) FROM mz_sinks WHERE id LIKE 'u%' GROUP BY connector_type)
)
)
)",
workers = config.workers
)
}
#[derive(Deserialize)]
struct V1VersionResponse {
latest_release: String,
}
async fn report_one(config: &Config) -> Result<semver::Version, anyhow::Error> {
let response: V1VersionResponse = Retry::default()
.initial_backoff(Duration::from_secs(1))
.max_duration(config.interval)
.retry_async(|_state| async {
let query_result = config
.coord_client
.system_execute_one(&make_telemetry_query(config))
.await?;
let response = mz_http_proxy::reqwest::client()
.post(format!(
"https://{}/api/telemetry/{}",
config.domain, config.cluster_id
))
.timeout(Duration::from_secs(10))
.json(&query_result.rows[0][0])
.send()
.await?
.error_for_status()?;
Ok::<_, anyhow::Error>(response.json().await?)
})
.await?;
Ok(response.latest_release.parse()?)
}