1use std::collections::{BTreeMap, BTreeSet};
18
19use mz_adapter_types::dyncfgs::{OIDC_GROUP_ROLE_SYNC_ENABLED, OIDC_GROUP_ROLE_SYNC_STRICT};
20use mz_repr::role_id::RoleId;
21use mz_sql::session::user::MZ_JWT_SYNC_ROLE_ID;
22use tokio::sync::mpsc;
23use tracing::{info, warn};
24
25use crate::AdapterError;
26use crate::catalog::{self, Op};
27use crate::coord::Coordinator;
28use crate::notice::AdapterNotice;
29
30#[derive(Debug, Clone)]
32pub struct GroupSyncDiff {
33 pub grants: Vec<Op>,
35 pub revokes: Vec<Op>,
37}
38
39pub fn compute_group_sync_diff(
56 member_id: RoleId,
57 current_membership: &BTreeMap<RoleId, RoleId>,
58 target_role_ids: &BTreeSet<RoleId>,
59) -> GroupSyncDiff {
60 let mut sync_granted: BTreeSet<RoleId> = BTreeSet::new();
62 let mut manual_granted: BTreeSet<RoleId> = BTreeSet::new();
63
64 for (role_id, grantor_id) in current_membership {
65 if *grantor_id == MZ_JWT_SYNC_ROLE_ID {
66 sync_granted.insert(*role_id);
67 } else {
68 manual_granted.insert(*role_id);
69 }
70 }
71
72 let grants: Vec<Op> = target_role_ids
74 .iter()
75 .filter(|r| !sync_granted.contains(r) && !manual_granted.contains(r))
76 .map(|&role_id| Op::GrantRole {
77 role_id,
78 member_id,
79 grantor_id: MZ_JWT_SYNC_ROLE_ID,
80 })
81 .collect();
82
83 let revokes: Vec<Op> = sync_granted
85 .iter()
86 .filter(|r| !target_role_ids.contains(r))
87 .map(|&role_id| Op::RevokeRole {
88 role_id,
89 member_id,
90 grantor_id: MZ_JWT_SYNC_ROLE_ID,
91 })
92 .collect();
93
94 GroupSyncDiff { grants, revokes }
95}
96
97impl Coordinator {
98 pub(crate) async fn maybe_sync_jwt_groups(
109 &mut self,
110 member_id: RoleId,
111 groups: Option<&[String]>,
112 notice_tx: &mpsc::UnboundedSender<AdapterNotice>,
113 ) -> Result<(), AdapterError> {
114 let groups = match groups {
115 Some(g) => g,
116 None => return Ok(()),
117 };
118
119 let dyncfgs = self.catalog().system_config().dyncfgs();
120 let sync_enabled = OIDC_GROUP_ROLE_SYNC_ENABLED.get(dyncfgs);
121 let strict = OIDC_GROUP_ROLE_SYNC_STRICT.get(dyncfgs);
122
123 if !sync_enabled {
124 return Ok(());
125 }
126
127 let mut notices = Vec::new();
128 match self.sync_jwt_groups(member_id, groups, &mut notices).await {
129 Ok(()) => {}
130 Err(e) => {
131 if strict {
132 return Err(AdapterError::OidcGroupSyncFailed(e.to_string()));
133 } else {
134 warn!(
135 error = %e,
136 "OIDC group sync failed, proceeding with login (fail-open mode)"
137 );
138 notices.push(AdapterNotice::OidcGroupSyncError {
139 message: e.to_string(),
140 });
141 }
142 }
143 }
144 for notice in notices {
145 let _ = notice_tx.send(notice);
146 }
147 Ok(())
148 }
149
150 pub(crate) async fn sync_jwt_groups(
160 &mut self,
161 member_id: RoleId,
162 groups: &[String],
163 notices: &mut Vec<AdapterNotice>,
164 ) -> Result<(), AdapterError> {
165 let role_map = self.catalog().roles_by_lowercase_name();
166
167 let mut target_role_ids = BTreeSet::new();
169 for group in groups {
170 if catalog::is_reserved_role_name(group) {
172 warn!(
173 group = group.as_str(),
174 "OIDC group maps to reserved role name, skipping"
175 );
176 notices.push(AdapterNotice::OidcGroupSyncReservedRole {
177 group: group.clone(),
178 });
179 continue;
180 }
181
182 match role_map.get(&group.to_lowercase()).copied() {
183 Some(role) => {
184 if role.id == member_id {
189 info!(
190 group = group.as_str(),
191 "OIDC group maps to the user's own role, skipping"
192 );
193 continue;
194 }
195 target_role_ids.insert(role.id);
196 }
197 None => {
198 info!(
199 group = group.as_str(),
200 "OIDC group has no matching Materialize role, skipping"
201 );
202 notices.push(AdapterNotice::OidcGroupSyncUnmatchedGroup {
203 group: group.clone(),
204 });
205 }
206 }
207 }
208
209 let current_membership = self.catalog().get_role(&member_id).membership.map.clone();
213
214 let diff = compute_group_sync_diff(member_id, ¤t_membership, &target_role_ids);
216
217 if diff.grants.is_empty() && diff.revokes.is_empty() {
219 return Ok(());
220 }
221
222 let mut ops = diff.revokes;
224 ops.extend(diff.grants);
225
226 self.catalog_transact(None, ops).await?;
227
228 Ok(())
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use crate::catalog::is_reserved_role_name;
236
237 fn user_id() -> RoleId {
238 RoleId::User(100)
239 }
240 fn role_a() -> RoleId {
241 RoleId::User(1)
242 }
243 fn role_b() -> RoleId {
244 RoleId::User(2)
245 }
246 fn role_c() -> RoleId {
247 RoleId::User(3)
248 }
249 fn admin_id() -> RoleId {
250 RoleId::User(99)
251 }
252
253 fn grant_role_ids(diff: &GroupSyncDiff) -> BTreeSet<RoleId> {
255 diff.grants
256 .iter()
257 .map(|op| match op {
258 Op::GrantRole { role_id, .. } => *role_id,
259 _ => panic!("expected GrantRole op"),
260 })
261 .collect()
262 }
263
264 fn revoke_role_ids(diff: &GroupSyncDiff) -> BTreeSet<RoleId> {
266 diff.revokes
267 .iter()
268 .map(|op| match op {
269 Op::RevokeRole { role_id, .. } => *role_id,
270 _ => panic!("expected RevokeRole op"),
271 })
272 .collect()
273 }
274
275 fn assert_grants_well_formed(diff: &GroupSyncDiff, expected_member: RoleId) {
277 for op in &diff.grants {
278 match op {
279 Op::GrantRole {
280 member_id,
281 grantor_id,
282 ..
283 } => {
284 assert_eq!(*member_id, expected_member);
285 assert_eq!(*grantor_id, MZ_JWT_SYNC_ROLE_ID);
286 }
287 _ => panic!("expected GrantRole op"),
288 }
289 }
290 }
291
292 fn assert_revokes_well_formed(diff: &GroupSyncDiff, expected_member: RoleId) {
294 for op in &diff.revokes {
295 match op {
296 Op::RevokeRole {
297 member_id,
298 grantor_id,
299 ..
300 } => {
301 assert_eq!(*member_id, expected_member);
302 assert_eq!(*grantor_id, MZ_JWT_SYNC_ROLE_ID);
303 }
304 _ => panic!("expected RevokeRole op"),
305 }
306 }
307 }
308
309 #[mz_ore::test]
310 fn test_first_login_grants_all() {
311 let current = BTreeMap::new();
312 let target = BTreeSet::from([role_a(), role_b()]);
313 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
314
315 assert_eq!(diff.grants.len(), 2);
316 assert_eq!(diff.revokes.len(), 0);
317 assert_eq!(grant_role_ids(&diff), BTreeSet::from([role_a(), role_b()]));
318 assert_grants_well_formed(&diff, user_id());
319 }
320
321 #[mz_ore::test]
322 fn test_no_change_is_noop() {
323 let current = BTreeMap::from([
324 (role_a(), MZ_JWT_SYNC_ROLE_ID),
325 (role_b(), MZ_JWT_SYNC_ROLE_ID),
326 ]);
327 let target = BTreeSet::from([role_a(), role_b()]);
328 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
329
330 assert_eq!(diff.grants.len(), 0);
331 assert_eq!(diff.revokes.len(), 0);
332 }
333
334 #[mz_ore::test]
335 fn test_revoke_removed_groups() {
336 let current = BTreeMap::from([
337 (role_a(), MZ_JWT_SYNC_ROLE_ID),
338 (role_b(), MZ_JWT_SYNC_ROLE_ID),
339 (role_c(), MZ_JWT_SYNC_ROLE_ID),
340 ]);
341 let target = BTreeSet::from([role_a()]);
342 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
343
344 assert_eq!(diff.grants.len(), 0);
345 assert_eq!(diff.revokes.len(), 2);
346 assert_eq!(revoke_role_ids(&diff), BTreeSet::from([role_b(), role_c()]));
347 assert_revokes_well_formed(&diff, user_id());
348 }
349
350 #[mz_ore::test]
351 fn test_manual_grants_untouched() {
352 let current = BTreeMap::from([(role_a(), admin_id()), (role_b(), MZ_JWT_SYNC_ROLE_ID)]);
356 let target = BTreeSet::from([role_a(), role_c()]);
357 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
358
359 assert_eq!(diff.grants.len(), 1);
360 assert_eq!(diff.revokes.len(), 1);
361 assert_eq!(grant_role_ids(&diff), BTreeSet::from([role_c()]));
362 assert_eq!(revoke_role_ids(&diff), BTreeSet::from([role_b()]));
363 }
364
365 #[mz_ore::test]
366 fn test_empty_target_revokes_all_sync() {
367 let current = BTreeMap::from([
370 (role_a(), MZ_JWT_SYNC_ROLE_ID),
371 (role_b(), MZ_JWT_SYNC_ROLE_ID),
372 (role_c(), admin_id()),
373 ]);
374 let target = BTreeSet::new();
375 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
376
377 assert_eq!(diff.grants.len(), 0);
378 assert_eq!(diff.revokes.len(), 2);
379 assert_eq!(revoke_role_ids(&diff), BTreeSet::from([role_a(), role_b()]));
380 }
381
382 #[mz_ore::test]
383 fn test_mixed_grant_and_revoke() {
384 let current = BTreeMap::from([(role_a(), MZ_JWT_SYNC_ROLE_ID)]);
387 let target = BTreeSet::from([role_b()]);
388 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
389
390 assert_eq!(diff.grants.len(), 1);
391 assert_eq!(diff.revokes.len(), 1);
392 assert_eq!(grant_role_ids(&diff), BTreeSet::from([role_b()]));
393 assert_eq!(revoke_role_ids(&diff), BTreeSet::from([role_a()]));
394 }
395
396 #[mz_ore::test]
397 fn test_empty_current_empty_target() {
398 let current = BTreeMap::new();
399 let target = BTreeSet::new();
400 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
401
402 assert_eq!(diff.grants.len(), 0);
403 assert_eq!(diff.revokes.len(), 0);
404 }
405
406 #[mz_ore::test]
407 fn test_all_manual_grants_no_revokes() {
408 let current = BTreeMap::from([(role_a(), admin_id()), (role_b(), admin_id())]);
411 let target = BTreeSet::from([role_c()]);
412 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
413
414 assert_eq!(diff.grants.len(), 1);
415 assert_eq!(diff.revokes.len(), 0);
416 assert_eq!(grant_role_ids(&diff), BTreeSet::from([role_c()]));
417 }
418
419 #[mz_ore::test]
420 fn test_target_overlaps_both_manual_and_sync() {
421 let role_d = RoleId::User(4);
424 let current = BTreeMap::from([
425 (role_a(), admin_id()),
426 (role_b(), MZ_JWT_SYNC_ROLE_ID),
427 (role_c(), MZ_JWT_SYNC_ROLE_ID),
428 ]);
429 let target = BTreeSet::from([role_a(), role_b(), role_d]);
430 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
431
432 assert_eq!(diff.grants.len(), 1);
433 assert_eq!(grant_role_ids(&diff), BTreeSet::from([role_d]));
434 assert_eq!(diff.revokes.len(), 1);
435 assert_eq!(revoke_role_ids(&diff), BTreeSet::from([role_c()]));
436 }
437
438 #[mz_ore::test]
443 fn test_reserved_role_mz_prefix() {
444 assert!(is_reserved_role_name("mz_system"));
445 assert!(is_reserved_role_name("mz_introspection"));
446 assert!(is_reserved_role_name("mz_jwt_sync"));
447 assert!(is_reserved_role_name("mz_anything"));
448 }
449
450 #[mz_ore::test]
451 fn test_reserved_role_pg_prefix() {
452 assert!(is_reserved_role_name("pg_monitor"));
453 assert!(is_reserved_role_name("pg_read_all_data"));
454 }
455
456 #[mz_ore::test]
457 fn test_reserved_role_public() {
458 assert!(is_reserved_role_name("PUBLIC"));
459 }
460
461 #[mz_ore::test]
462 fn test_normal_role_names_not_reserved() {
463 assert!(!is_reserved_role_name("analytics"));
464 assert!(!is_reserved_role_name("platform_eng"));
465 assert!(!is_reserved_role_name("admin"));
466 assert!(!is_reserved_role_name("data_eng"));
467 assert!(!is_reserved_role_name("mzz_custom"));
469 assert!(!is_reserved_role_name("pga_custom"));
470 }
471
472 #[mz_ore::test]
478 fn test_all_reserved_filtered_results_in_empty_target() {
479 let current = BTreeMap::from([
482 (role_a(), MZ_JWT_SYNC_ROLE_ID),
483 (role_b(), MZ_JWT_SYNC_ROLE_ID),
484 ]);
485 let target = BTreeSet::new(); let diff = compute_group_sync_diff(user_id(), ¤t, &target);
487
488 assert_eq!(diff.grants.len(), 0);
489 assert_eq!(diff.revokes.len(), 2);
490 assert_eq!(revoke_role_ids(&diff), BTreeSet::from([role_a(), role_b()]));
491 }
492
493 #[mz_ore::test]
494 fn test_mixed_reserved_and_valid_after_filtering() {
495 let current = BTreeMap::new();
499 let target = BTreeSet::from([role_a()]); let diff = compute_group_sync_diff(user_id(), ¤t, &target);
501
502 assert_eq!(diff.grants.len(), 1);
503 assert_eq!(diff.revokes.len(), 0);
504 assert_eq!(grant_role_ids(&diff), BTreeSet::from([role_a()]));
505 }
506}