1use std::collections::{BTreeMap, BTreeSet};
18
19use mz_adapter_types::dyncfgs::{OIDC_GROUP_ROLE_SYNC_ENABLED, OIDC_GROUP_ROLE_SYNC_STRICT};
20use mz_repr::role_id::RoleId;
21use mz_sql::session::user::MZ_JWT_SYNC_ROLE_ID;
22use tokio::sync::mpsc;
23use tracing::{info, warn};
24
25use crate::AdapterError;
26use crate::catalog::{self, Op};
27use crate::coord::Coordinator;
28use crate::notice::AdapterNotice;
29
30#[derive(Debug, Clone)]
32pub struct GroupSyncDiff {
33 pub grants: Vec<Op>,
35 pub revokes: Vec<Op>,
37}
38
39pub fn compute_group_sync_diff(
56 member_id: RoleId,
57 current_membership: &BTreeMap<RoleId, RoleId>,
58 target_role_ids: &BTreeSet<RoleId>,
59) -> GroupSyncDiff {
60 let mut sync_granted: BTreeSet<RoleId> = BTreeSet::new();
62 let mut manual_granted: BTreeSet<RoleId> = BTreeSet::new();
63
64 for (role_id, grantor_id) in current_membership {
65 if *grantor_id == MZ_JWT_SYNC_ROLE_ID {
66 sync_granted.insert(*role_id);
67 } else {
68 manual_granted.insert(*role_id);
69 }
70 }
71
72 let grants: Vec<Op> = target_role_ids
74 .iter()
75 .filter(|r| !sync_granted.contains(r) && !manual_granted.contains(r))
76 .map(|&role_id| Op::GrantRole {
77 role_id,
78 member_id,
79 grantor_id: MZ_JWT_SYNC_ROLE_ID,
80 })
81 .collect();
82
83 let revokes: Vec<Op> = sync_granted
85 .iter()
86 .filter(|r| !target_role_ids.contains(r))
87 .map(|&role_id| Op::RevokeRole {
88 role_id,
89 member_id,
90 grantor_id: MZ_JWT_SYNC_ROLE_ID,
91 })
92 .collect();
93
94 GroupSyncDiff { grants, revokes }
95}
96
97impl Coordinator {
98 pub(crate) async fn maybe_sync_jwt_groups(
109 &mut self,
110 member_id: RoleId,
111 groups: Option<&[String]>,
112 notice_tx: &mpsc::UnboundedSender<AdapterNotice>,
113 ) -> Result<(), AdapterError> {
114 let groups = match groups {
115 Some(g) => g,
116 None => return Ok(()),
117 };
118
119 let dyncfgs = self.catalog().system_config().dyncfgs();
120 let sync_enabled = OIDC_GROUP_ROLE_SYNC_ENABLED.get(dyncfgs);
121 let strict = OIDC_GROUP_ROLE_SYNC_STRICT.get(dyncfgs);
122
123 if !sync_enabled {
124 return Ok(());
125 }
126
127 let mut notices = Vec::new();
128 match self.sync_jwt_groups(member_id, groups, &mut notices).await {
129 Ok(()) => {}
130 Err(e) => {
131 if strict {
132 return Err(AdapterError::OidcGroupSyncFailed(e.to_string()));
133 } else {
134 warn!(
135 error = %e,
136 "OIDC group sync failed, proceeding with login (fail-open mode)"
137 );
138 notices.push(AdapterNotice::OidcGroupSyncError {
139 message: e.to_string(),
140 });
141 }
142 }
143 }
144 for notice in notices {
145 let _ = notice_tx.send(notice);
146 }
147 Ok(())
148 }
149
150 pub(crate) async fn sync_jwt_groups(
160 &mut self,
161 member_id: RoleId,
162 groups: &[String],
163 notices: &mut Vec<AdapterNotice>,
164 ) -> Result<(), AdapterError> {
165 let mut target_role_ids = BTreeSet::new();
167 for group in groups {
168 if catalog::is_reserved_role_name(&group.to_lowercase()) {
171 warn!(
172 group = group.as_str(),
173 "OIDC group maps to reserved role name, skipping"
174 );
175 notices.push(AdapterNotice::OidcGroupSyncReservedRole {
176 group: group.clone(),
177 });
178 continue;
179 }
180
181 match self.catalog().try_get_role_by_name(group) {
182 Some(role) => {
183 if role.id == member_id {
188 info!(
189 group = group.as_str(),
190 "OIDC group maps to the user's own role, skipping"
191 );
192 continue;
193 }
194 target_role_ids.insert(role.id);
195 }
196 None => {
197 info!(
198 group = group.as_str(),
199 "OIDC group has no matching Materialize role, skipping"
200 );
201 notices.push(AdapterNotice::OidcGroupSyncUnmatchedGroup {
202 group: group.clone(),
203 });
204 }
205 }
206 }
207
208 let current_membership = self.catalog().get_role(&member_id).membership.map.clone();
212
213 let diff = compute_group_sync_diff(member_id, ¤t_membership, &target_role_ids);
215
216 if diff.grants.is_empty() && diff.revokes.is_empty() {
218 return Ok(());
219 }
220
221 let mut ops = diff.revokes;
223 ops.extend(diff.grants);
224
225 self.catalog_transact(None, ops).await?;
226
227 Ok(())
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use crate::catalog::is_reserved_role_name;
235
236 fn user_id() -> RoleId {
237 RoleId::User(100)
238 }
239 fn role_a() -> RoleId {
240 RoleId::User(1)
241 }
242 fn role_b() -> RoleId {
243 RoleId::User(2)
244 }
245 fn role_c() -> RoleId {
246 RoleId::User(3)
247 }
248 fn admin_id() -> RoleId {
249 RoleId::User(99)
250 }
251
252 fn grant_role_ids(diff: &GroupSyncDiff) -> BTreeSet<RoleId> {
254 diff.grants
255 .iter()
256 .map(|op| match op {
257 Op::GrantRole { role_id, .. } => *role_id,
258 _ => panic!("expected GrantRole op"),
259 })
260 .collect()
261 }
262
263 fn revoke_role_ids(diff: &GroupSyncDiff) -> BTreeSet<RoleId> {
265 diff.revokes
266 .iter()
267 .map(|op| match op {
268 Op::RevokeRole { role_id, .. } => *role_id,
269 _ => panic!("expected RevokeRole op"),
270 })
271 .collect()
272 }
273
274 fn assert_grants_well_formed(diff: &GroupSyncDiff, expected_member: RoleId) {
276 for op in &diff.grants {
277 match op {
278 Op::GrantRole {
279 member_id,
280 grantor_id,
281 ..
282 } => {
283 assert_eq!(*member_id, expected_member);
284 assert_eq!(*grantor_id, MZ_JWT_SYNC_ROLE_ID);
285 }
286 _ => panic!("expected GrantRole op"),
287 }
288 }
289 }
290
291 fn assert_revokes_well_formed(diff: &GroupSyncDiff, expected_member: RoleId) {
293 for op in &diff.revokes {
294 match op {
295 Op::RevokeRole {
296 member_id,
297 grantor_id,
298 ..
299 } => {
300 assert_eq!(*member_id, expected_member);
301 assert_eq!(*grantor_id, MZ_JWT_SYNC_ROLE_ID);
302 }
303 _ => panic!("expected RevokeRole op"),
304 }
305 }
306 }
307
308 #[mz_ore::test]
309 fn test_first_login_grants_all() {
310 let current = BTreeMap::new();
311 let target = BTreeSet::from([role_a(), role_b()]);
312 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
313
314 assert_eq!(diff.grants.len(), 2);
315 assert_eq!(diff.revokes.len(), 0);
316 assert_eq!(grant_role_ids(&diff), BTreeSet::from([role_a(), role_b()]));
317 assert_grants_well_formed(&diff, user_id());
318 }
319
320 #[mz_ore::test]
321 fn test_no_change_is_noop() {
322 let current = BTreeMap::from([
323 (role_a(), MZ_JWT_SYNC_ROLE_ID),
324 (role_b(), MZ_JWT_SYNC_ROLE_ID),
325 ]);
326 let target = BTreeSet::from([role_a(), role_b()]);
327 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
328
329 assert_eq!(diff.grants.len(), 0);
330 assert_eq!(diff.revokes.len(), 0);
331 }
332
333 #[mz_ore::test]
334 fn test_revoke_removed_groups() {
335 let current = BTreeMap::from([
336 (role_a(), MZ_JWT_SYNC_ROLE_ID),
337 (role_b(), MZ_JWT_SYNC_ROLE_ID),
338 (role_c(), MZ_JWT_SYNC_ROLE_ID),
339 ]);
340 let target = BTreeSet::from([role_a()]);
341 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
342
343 assert_eq!(diff.grants.len(), 0);
344 assert_eq!(diff.revokes.len(), 2);
345 assert_eq!(revoke_role_ids(&diff), BTreeSet::from([role_b(), role_c()]));
346 assert_revokes_well_formed(&diff, user_id());
347 }
348
349 #[mz_ore::test]
350 fn test_manual_grants_untouched() {
351 let current = BTreeMap::from([(role_a(), admin_id()), (role_b(), MZ_JWT_SYNC_ROLE_ID)]);
355 let target = BTreeSet::from([role_a(), role_c()]);
356 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
357
358 assert_eq!(diff.grants.len(), 1);
359 assert_eq!(diff.revokes.len(), 1);
360 assert_eq!(grant_role_ids(&diff), BTreeSet::from([role_c()]));
361 assert_eq!(revoke_role_ids(&diff), BTreeSet::from([role_b()]));
362 }
363
364 #[mz_ore::test]
365 fn test_empty_target_revokes_all_sync() {
366 let current = BTreeMap::from([
369 (role_a(), MZ_JWT_SYNC_ROLE_ID),
370 (role_b(), MZ_JWT_SYNC_ROLE_ID),
371 (role_c(), admin_id()),
372 ]);
373 let target = BTreeSet::new();
374 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
375
376 assert_eq!(diff.grants.len(), 0);
377 assert_eq!(diff.revokes.len(), 2);
378 assert_eq!(revoke_role_ids(&diff), BTreeSet::from([role_a(), role_b()]));
379 }
380
381 #[mz_ore::test]
382 fn test_mixed_grant_and_revoke() {
383 let current = BTreeMap::from([(role_a(), MZ_JWT_SYNC_ROLE_ID)]);
386 let target = BTreeSet::from([role_b()]);
387 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
388
389 assert_eq!(diff.grants.len(), 1);
390 assert_eq!(diff.revokes.len(), 1);
391 assert_eq!(grant_role_ids(&diff), BTreeSet::from([role_b()]));
392 assert_eq!(revoke_role_ids(&diff), BTreeSet::from([role_a()]));
393 }
394
395 #[mz_ore::test]
396 fn test_empty_current_empty_target() {
397 let current = BTreeMap::new();
398 let target = BTreeSet::new();
399 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
400
401 assert_eq!(diff.grants.len(), 0);
402 assert_eq!(diff.revokes.len(), 0);
403 }
404
405 #[mz_ore::test]
406 fn test_all_manual_grants_no_revokes() {
407 let current = BTreeMap::from([(role_a(), admin_id()), (role_b(), admin_id())]);
410 let target = BTreeSet::from([role_c()]);
411 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
412
413 assert_eq!(diff.grants.len(), 1);
414 assert_eq!(diff.revokes.len(), 0);
415 assert_eq!(grant_role_ids(&diff), BTreeSet::from([role_c()]));
416 }
417
418 #[mz_ore::test]
419 fn test_target_overlaps_both_manual_and_sync() {
420 let role_d = RoleId::User(4);
423 let current = BTreeMap::from([
424 (role_a(), admin_id()),
425 (role_b(), MZ_JWT_SYNC_ROLE_ID),
426 (role_c(), MZ_JWT_SYNC_ROLE_ID),
427 ]);
428 let target = BTreeSet::from([role_a(), role_b(), role_d]);
429 let diff = compute_group_sync_diff(user_id(), ¤t, &target);
430
431 assert_eq!(diff.grants.len(), 1);
432 assert_eq!(grant_role_ids(&diff), BTreeSet::from([role_d]));
433 assert_eq!(diff.revokes.len(), 1);
434 assert_eq!(revoke_role_ids(&diff), BTreeSet::from([role_c()]));
435 }
436
437 #[mz_ore::test]
442 fn test_reserved_role_mz_prefix() {
443 assert!(is_reserved_role_name("mz_system"));
444 assert!(is_reserved_role_name("mz_introspection"));
445 assert!(is_reserved_role_name("mz_jwt_sync"));
446 assert!(is_reserved_role_name("mz_anything"));
447 }
448
449 #[mz_ore::test]
450 fn test_reserved_role_pg_prefix() {
451 assert!(is_reserved_role_name("pg_monitor"));
452 assert!(is_reserved_role_name("pg_read_all_data"));
453 }
454
455 #[mz_ore::test]
456 fn test_reserved_role_public() {
457 assert!(is_reserved_role_name("PUBLIC"));
458 }
459
460 #[mz_ore::test]
461 fn test_normal_role_names_not_reserved() {
462 assert!(!is_reserved_role_name("analytics"));
463 assert!(!is_reserved_role_name("platform_eng"));
464 assert!(!is_reserved_role_name("admin"));
465 assert!(!is_reserved_role_name("data_eng"));
466 assert!(!is_reserved_role_name("mzz_custom"));
468 assert!(!is_reserved_role_name("pga_custom"));
469 }
470
471 #[mz_ore::test]
477 fn test_all_reserved_filtered_results_in_empty_target() {
478 let current = BTreeMap::from([
481 (role_a(), MZ_JWT_SYNC_ROLE_ID),
482 (role_b(), MZ_JWT_SYNC_ROLE_ID),
483 ]);
484 let target = BTreeSet::new(); let diff = compute_group_sync_diff(user_id(), ¤t, &target);
486
487 assert_eq!(diff.grants.len(), 0);
488 assert_eq!(diff.revokes.len(), 2);
489 assert_eq!(revoke_role_ids(&diff), BTreeSet::from([role_a(), role_b()]));
490 }
491
492 #[mz_ore::test]
493 fn test_mixed_reserved_and_valid_after_filtering() {
494 let current = BTreeMap::new();
498 let target = BTreeSet::from([role_a()]); let diff = compute_group_sync_diff(user_id(), ¤t, &target);
500
501 assert_eq!(diff.grants.len(), 1);
502 assert_eq!(diff.revokes.len(), 0);
503 assert_eq!(grant_role_ids(&diff), BTreeSet::from([role_a()]));
504 }
505}