mz_frontegg_auth/auth.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::num::NonZeroUsize;
13use std::pin::Pin;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use anyhow::Context as _;
18use derivative::Derivative;
19use futures::FutureExt;
20use futures::future::Shared;
21use jsonwebtoken::{Algorithm, DecodingKey, Validation};
22use lru::LruCache;
23use mz_auth::Authenticated;
24use mz_ore::instrument;
25use mz_ore::metrics::MetricsRegistry;
26use mz_ore::now::NowFn;
27use mz_ore::time::DurationExt;
28use mz_repr::user::ExternalUserMetadata;
29use serde::{Deserialize, Serialize};
30use tokio::sync::watch;
31use tokio::time;
32use uuid::Uuid;
33
34use crate::metrics::Metrics;
35use crate::{ApiTokenArgs, AppPassword, Client, Error, FronteggCliArgs};
36
37/// SAFETY: Value is known to be non-zero.
38pub const DEFAULT_REFRESH_DROP_LRU_CACHE_SIZE: NonZeroUsize =
39 unsafe { NonZeroUsize::new_unchecked(1024) };
40
41/// If a session is dropped within [`DEFAULT_REFRESH_DROP_FACTOR`] `* valid_for` seconds of an
42/// authentication token expiring, then we'll continue to refresh the auth token, with the
43/// assumption that a new instance of this session will be started soon.
44pub const DEFAULT_REFRESH_DROP_FACTOR: f64 = 0.05;
45
46/// The maximum length of a user name.
47pub const MAX_USER_NAME_LENGTH: usize = 255;
48
49/// Configures an [`Authenticator`].
50#[derive(Clone, Derivative)]
51#[derivative(Debug)]
52pub struct AuthenticatorConfig {
53 /// URL for the token endpoint, including full path.
54 pub admin_api_token_url: String,
55 /// JWK used to validate JWTs.
56 #[derivative(Debug = "ignore")]
57 pub decoding_key: DecodingKey,
58 /// Optional tenant id used to validate JWTs.
59 pub tenant_id: Option<Uuid>,
60 /// Function to provide system time to validate exp (expires at) field of JWTs.
61 pub now: NowFn,
62 /// Name of admin role.
63 pub admin_role: String,
64 /// How many [`AppPassword`]s we'll track the last dropped time for.
65 ///
66 /// TODO(parkmycar): Wire this up to LaunchDarkly.
67 pub refresh_drop_lru_size: NonZeroUsize,
68 /// How large of a window we'll use for determining if a session was dropped "recently", and if
69 /// we should refresh the session, even if there are not any active handles to it.
70 ///
71 /// TODO(parkmycar): Wire this up to LaunchDarkly.
72 pub refresh_drop_factor: f64,
73}
74
75/// Facilitates authenticating users via Frontegg, and verifying returned JWTs.
76#[derive(Clone, Debug)]
77pub struct Authenticator {
78 inner: Arc<AuthenticatorInner>,
79}
80
81impl Authenticator {
82 /// Creates a new authenticator.
83 pub fn new(config: AuthenticatorConfig, client: Client, registry: &MetricsRegistry) -> Self {
84 let mut validation = Validation::new(Algorithm::RS256);
85
86 // We validate the token expiration with our own now function.
87 validation.validate_exp = false;
88
89 // We don't validate the audience because:
90 //
91 // 1. We don't have easy access to the expected audience ID here.
92 //
93 // 2. There is no meaningful security improvement to doing so, because
94 // Frontegg always sets the audience to the ID of the workspace
95 // that issued the token. Since we only trust the signing keys from
96 // a single Frontegg workspace, the audience is redundant.
97 //
98 // See this conversation [0] from the Materialize–Frontegg shared Slack
99 // channel on 1 January 2024.
100 //
101 // NOTE we do validate that the tenantId claim matches the expected tenant_id
102 // in order to ensure that the JWT was created for the specific environment.
103 //
104 // [0]: https://materializeinc.slack.com/archives/C02940WNMRQ/p1704131331041669
105 validation.validate_aud = false;
106
107 let metrics = Metrics::register_into(registry);
108 let active_sessions = Mutex::new(BTreeMap::new());
109 let dropped_sessions = Mutex::new(LruCache::new(config.refresh_drop_lru_size));
110
111 Authenticator {
112 inner: Arc::new(AuthenticatorInner {
113 admin_api_token_url: config.admin_api_token_url,
114 client,
115 validation,
116 decoding_key: config.decoding_key,
117 tenant_id: config.tenant_id,
118 admin_role: config.admin_role,
119 now: config.now,
120 active_sessions,
121 dropped_sessions,
122 refresh_drop_factor: config.refresh_drop_factor,
123 metrics,
124 }),
125 }
126 }
127
128 /// Create an [`Authenticator`] from [`FronteggCliArgs`].
129 pub fn from_args(
130 args: FronteggCliArgs,
131 registry: &MetricsRegistry,
132 ) -> Result<Option<Self>, Error> {
133 let config = match (
134 args.frontegg_tenant,
135 args.frontegg_api_token_url,
136 args.frontegg_admin_role,
137 ) {
138 (None, None, None) => {
139 return Ok(None);
140 }
141 (Some(tenant_id), Some(admin_api_token_url), Some(admin_role)) => {
142 let decoding_key = match (args.frontegg_jwk, args.frontegg_jwk_file) {
143 (None, Some(path)) => {
144 let jwk = std::fs::read(&path)
145 .with_context(|| format!("reading {path:?} for --frontegg-jwk-file"))?;
146 DecodingKey::from_rsa_pem(&jwk)?
147 }
148 (Some(jwk), None) => DecodingKey::from_rsa_pem(jwk.as_bytes())?,
149 _ => {
150 return Err(anyhow::anyhow!(
151 "expected exactly one of --frontegg-jwk or --frontegg-jwk-file"
152 )
153 .into());
154 }
155 };
156 AuthenticatorConfig {
157 admin_api_token_url,
158 decoding_key,
159 tenant_id: Some(tenant_id),
160 now: mz_ore::now::SYSTEM_TIME.clone(),
161 admin_role,
162 refresh_drop_lru_size: DEFAULT_REFRESH_DROP_LRU_CACHE_SIZE,
163 refresh_drop_factor: DEFAULT_REFRESH_DROP_FACTOR,
164 }
165 }
166 _ => unreachable!("clap enforced"),
167 };
168 let client = Client::environmentd_default();
169
170 Ok(Some(Self::new(config, client, registry)))
171 }
172
173 /// Establishes a new authentication session.
174 ///
175 /// If successful, returns a handle to the authentication session.
176 /// Otherwise, returns the authentication error.
177 pub async fn authenticate(
178 &self,
179 expected_user: &str,
180 password: &str,
181 group_claim: Option<&str>,
182 ) -> Result<(AuthSessionHandle, Authenticated), Error> {
183 let password: AppPassword = password.parse()?;
184 match self
185 .authenticate_inner(expected_user, password, group_claim)
186 .await
187 {
188 Ok(handle) => {
189 tracing::debug!("authentication successful");
190 Ok((handle, Authenticated))
191 }
192 Err(e) => {
193 tracing::debug!(error = ?e, "authentication failed");
194 Err(e)
195 }
196 }
197 }
198
199 #[instrument(level = "debug", fields(client_id = %password.client_id))]
200 async fn authenticate_inner(
201 &self,
202 expected_user: &str,
203 password: AppPassword,
204 group_claim: Option<&str>,
205 ) -> Result<AuthSessionHandle, Error> {
206 let request = {
207 let mut sessions = self.inner.active_sessions.lock().expect("lock poisoned");
208 match sessions.get_mut(&password) {
209 // We have an existing session for this app password.
210 Some(AuthSession::Active {
211 ident,
212 external_metadata_tx,
213 groups_tx,
214 }) => {
215 tracing::debug!(?password.client_id, "joining active session");
216
217 validate_user(&ident.user, expected_user)?;
218 self.inner
219 .metrics
220 .session_request_count
221 .with_label_values(&["active"])
222 .inc();
223
224 // Return a handle to the existing session.
225 return Ok(AuthSessionHandle {
226 ident: Arc::clone(ident),
227 external_metadata_rx: external_metadata_tx.subscribe(),
228 groups_rx: groups_tx.subscribe(),
229 authenticator: Arc::clone(&self.inner),
230 app_password: password,
231 });
232 }
233
234 // We have an in flight request to establish a session.
235 Some(AuthSession::Pending(request)) => {
236 // Latch on to the existing session.
237 tracing::debug!(?password.client_id, "joining pending session");
238 self.inner
239 .metrics
240 .session_request_count
241 .with_label_values(&["pending"])
242 .inc();
243 request.clone()
244 }
245
246 // We do not have an existing session for this API key.
247 None => {
248 tracing::debug!(?password.client_id, "starting new session");
249
250 // Prepare the request to create a new session.
251 let request: Pin<Box<AuthFuture>> = Box::pin({
252 let inner = Arc::clone(&self.inner);
253 let expected_user = String::from(expected_user);
254 let group_claim = group_claim.map(String::from);
255 async move {
256 let result = inner
257 .authenticate(expected_user, password, group_claim)
258 .await;
259
260 // Make sure our AuthSession state is correct.
261 //
262 // Note: We're quite defensive here because this has been a source of
263 // bugs in the past.
264 let mut sessions = inner.active_sessions.lock().expect("lock poisoned");
265 if let Err(err) = &result {
266 let session = sessions.remove(&password);
267 tracing::debug!(?err, ?session, "removing failed auth session");
268 } else {
269 // If the request succeeds, make sure our state is what we expect.
270 match sessions.get(&password) {
271 // Expected State.
272 Some(AuthSession::Active { .. }) => (),
273 // Invalid! The AuthSession should have become Active.
274 None | Some(AuthSession::Pending(_)) => {
275 tracing::error!(
276 ?password.client_id,
277 "failed to make auth session active!"
278 );
279 sessions.remove(&password);
280 }
281 }
282 }
283
284 result
285 }
286 });
287
288 // Store the future so that future requests can latch on.
289 let request = request.shared();
290 sessions.insert(password, AuthSession::Pending(request.clone()));
291 self.inner
292 .metrics
293 .session_request_count
294 .with_label_values(&["new"])
295 .inc();
296
297 // Make sure there is always something driving the request to completion
298 // incase the client goes away.
299 mz_ore::task::spawn(|| "auth-session-listener", {
300 let request = request.clone();
301 async move {
302 // We don't care about the result here, someone else handles it.
303 let _ = request.await;
304 }
305 });
306
307 // Wait for the request to complete.
308 request
309 }
310 }
311 };
312 request.await
313 }
314
315 /// Validates an access token, returning the validated claims.
316 ///
317 /// The following validations are always performed:
318 ///
319 /// * The token is not expired, according to the `Authentication`'s clock.
320 ///
321 /// * The tenant ID in the token matches the `Authentication`'s tenant ID.
322 ///
323 /// If `expected_user` is provided, the token's user name is additionally
324 /// validated to match `expected_user`.
325 pub fn validate_access_token(
326 &self,
327 token: &str,
328 expected_user: Option<&str>,
329 group_claim: Option<&str>,
330 ) -> Result<(ValidatedClaims, Authenticated), Error> {
331 let claims = self
332 .inner
333 .validate_access_token(token, expected_user, group_claim)?;
334 Ok((claims, Authenticated))
335 }
336}
337
338/// A handle to an authentication session.
339///
340/// An authentication session represents a duration of time during which a
341/// user's authentication is known to be valid.
342///
343/// An authentication session begins with a successful API key exchange with
344/// Frontegg. While there is at least one outstanding handle to the session, the
345/// session's metadata and validity are refreshed with Frontegg at a regular
346/// interval. The session ends when all outstanding handles are dropped and the
347/// refresh interval is reached.
348///
349/// [`AuthSessionHandle::external_metadata_rx`] can be used to receive events if
350/// the session's metadata is updated.
351///
352/// [`AuthSessionHandle::expired`] can be used to learn if the session has
353/// failed to refresh the validity of the API key.
354#[derive(Debug, Clone)]
355pub struct AuthSessionHandle {
356 ident: Arc<AuthSessionIdent>,
357 external_metadata_rx: watch::Receiver<ExternalUserMetadata>,
358 /// Receiver for group memberships, updated on every token refresh so
359 /// callers reading via [`AuthSessionHandle::groups`] see at-most-one-refresh
360 /// stale data even when joining a long-lived cached session.
361 groups_rx: watch::Receiver<Option<Vec<String>>>,
362 /// Hold a handle to the [`AuthenticatorInner`] so we can record when this session was dropped.
363 authenticator: Arc<AuthenticatorInner>,
364 /// Used to record when the session linked with this [`AppPassword`] was dropped.
365 app_password: AppPassword,
366}
367
368impl AuthSessionHandle {
369 /// Returns the name of the user that created the session.
370 pub fn user(&self) -> &str {
371 &self.ident.user
372 }
373
374 /// Returns the ID of the tenant that created the session.
375 pub fn tenant_id(&self) -> Uuid {
376 self.ident.tenant_id
377 }
378
379 /// Returns the groups extracted from the most recently refreshed JWT's
380 /// group claim, used for JWT group-to-role sync. `None` if the claim was
381 /// absent; `Some(vec![])` if present but empty; `Some(vec![...])`
382 /// otherwise. Refreshed by the background task on every token renewal so
383 /// cached sessions observe bounded-staleness group membership changes.
384 pub fn groups(&self) -> Option<Vec<String>> {
385 self.groups_rx.borrow().clone()
386 }
387
388 /// Mints a receiver for updates to the session user's external metadata.
389 pub fn external_metadata_rx(&self) -> watch::Receiver<ExternalUserMetadata> {
390 self.external_metadata_rx.clone()
391 }
392
393 /// Completes when the authentication session has expired.
394 pub async fn expired(&mut self) {
395 // We piggyback on the external metadata channel to determine session
396 // expiration. The external metadata channel is closed when the session
397 // expires.
398 let _ = self.external_metadata_rx.wait_for(|_| false).await;
399 }
400}
401
402impl Drop for AuthSessionHandle {
403 fn drop(&mut self) {
404 self.authenticator.record_dropped_session(self.app_password);
405 }
406}
407
408#[derive(Derivative)]
409#[derivative(Debug)]
410struct AuthenticatorInner {
411 /// Frontegg API fields.
412 admin_api_token_url: String,
413 client: Client,
414 /// JWT decoding and validation fields.
415 validation: Validation,
416 #[derivative(Debug = "ignore")]
417 decoding_key: DecodingKey,
418 tenant_id: Option<Uuid>,
419 admin_role: String,
420 now: NowFn,
421 /// Session tracking.
422 active_sessions: Mutex<BTreeMap<AppPassword, AuthSession>>,
423 /// Most recent time at which a session created with an [`AppPassword`] was dropped.
424 ///
425 /// We track when a session was dropped to handle the case of many one-shot queries being
426 /// issued in rapid succession. If it comes time to refresh an auth token, and there are no
427 /// currently alive sessions, but one was recently dropped, we'll pre-emptively refresh to get
428 /// ahead of another session being created with the same [`AppPassword`].
429 dropped_sessions: Mutex<LruCache<AppPassword, Instant>>,
430 /// How large of a window we'll use for determining if a session was dropped "recently", and if
431 /// we should refresh the session, even if there are not any active handles to it.
432 refresh_drop_factor: f64,
433 /// Metrics.
434 metrics: Metrics,
435}
436
437impl AuthenticatorInner {
438 async fn authenticate(
439 self: &Arc<Self>,
440 expected_user: String,
441 password: AppPassword,
442 group_claim: Option<String>,
443 ) -> Result<AuthSessionHandle, Error> {
444 // Attempt initial app password exchange.
445 let mut claims = self
446 .exchange_app_password(&expected_user, password, group_claim.as_deref())
447 .await?;
448
449 // Prep session information.
450 let ident = Arc::new(AuthSessionIdent {
451 user: claims.user.clone(),
452 tenant_id: claims.tenant_id,
453 });
454 let external_metadata = claims.to_external_user_metadata();
455 let (external_metadata_tx, external_metadata_rx) = watch::channel(external_metadata);
456 let external_metadata_tx = Arc::new(external_metadata_tx);
457 let (groups_tx, groups_rx) = watch::channel(claims.groups.clone());
458 let groups_tx = Arc::new(groups_tx);
459
460 // Store session to make it available for future requests to latch on
461 // to.
462 {
463 let mut sessions = self.active_sessions.lock().expect("lock poisoned");
464 sessions.insert(
465 password,
466 AuthSession::Active {
467 ident: Arc::clone(&ident),
468 external_metadata_tx: Arc::clone(&external_metadata_tx),
469 groups_tx: Arc::clone(&groups_tx),
470 },
471 );
472 }
473
474 // Start background refresh task.
475 let name = format!("frontegg-auth-refresh-{}", password.client_id);
476 mz_ore::task::spawn(|| name, {
477 let inner = Arc::clone(self);
478 async move {
479 tracing::debug!(?password.client_id, "starting refresh task");
480 inner.metrics.refresh_tasks_active.inc();
481
482 loop {
483 let valid_for = Duration::try_from_secs_i64(claims.exp - inner.now.as_secs())
484 .unwrap_or(Duration::from_secs(60));
485
486 // If we have no outstanding handling to this session, but a handle was dropped
487 // within this window, then we'll still refresh.
488 let drop_window = valid_for
489 .saturating_mul_f64(inner.refresh_drop_factor)
490 .max(Duration::from_secs(1));
491 // Scale the validity duration by 0.8. The Frontegg Python
492 // SDK scales the expires_in this way.
493 //
494 // <https://github.com/frontegg/python-sdk/blob/840f8318aced35cea6a41d83270597edfceb4019/frontegg/common/frontegg_authenticator.py#L45>
495 let valid_for = valid_for.saturating_mul_f64(0.8);
496
497 if valid_for < Duration::from_secs(60) {
498 tracing::warn!(?valid_for, "unexpectedly low token validity");
499 }
500
501 tracing::debug!(
502 ?valid_for,
503 ?drop_window,
504 "waiting for token validity period"
505 );
506
507 // Wait out validity duration.
508 time::sleep(valid_for).await;
509
510 // Check to see if all external metadata receivers have gone away, or if a
511 // session created with this password was recently dropped. If no one is
512 // listening nor any recent handles were dropped we can clean up the session.
513 let receiver_count = external_metadata_tx.receiver_count();
514 let last_drop = inner.last_dropped_session(&password);
515 let recent_drop = last_drop
516 .map(|dropped_at| dropped_at.elapsed() <= drop_window)
517 .unwrap_or(false);
518 if receiver_count == 0 && !recent_drop {
519 tracing::debug!(
520 ?last_drop,
521 ?password.client_id,
522 "all listeners have dropped and none of them were recent!"
523 );
524 break;
525 }
526
527 let outstanding_receivers = bool_as_str(receiver_count > 0);
528 inner
529 .metrics
530 .session_refresh_count
531 .with_label_values(&[outstanding_receivers, bool_as_str(recent_drop)])
532 .inc();
533 tracing::debug!(
534 receiver_count,
535 ?last_drop,
536 ?password.client_id,
537 "refreshing due to interest in the session"
538 );
539
540 // We still have interest, attempt to refresh the session.
541 let res = inner
542 .exchange_app_password(&expected_user, password, group_claim.as_deref())
543 .await;
544 claims = match res {
545 Ok(claims) => {
546 tracing::debug!("refresh successful");
547 claims
548 }
549 Err(e) => {
550 tracing::warn!(error = ?e, "refresh failed");
551 break;
552 }
553 };
554 external_metadata_tx.send_replace(ExternalUserMetadata {
555 admin: claims.is_admin,
556 user_id: claims.user_id,
557 });
558 groups_tx.send_replace(claims.groups.clone());
559 }
560
561 // The session has expired. Clean up the state.
562 {
563 let mut sessions = inner.active_sessions.lock().expect("lock poisoned");
564 sessions.remove(&password);
565 }
566 {
567 let mut dropped_session = inner.dropped_sessions.lock().expect("lock poisoned");
568 dropped_session.pop(&password);
569 }
570
571 tracing::debug!(?password.client_id, "shutting down refresh task");
572 inner.metrics.refresh_tasks_active.dec();
573 }
574 });
575
576 // Return handle to session.
577 Ok(AuthSessionHandle {
578 ident,
579 external_metadata_rx,
580 groups_rx,
581 authenticator: Arc::clone(self),
582 app_password: password,
583 })
584 }
585
586 #[instrument]
587 async fn exchange_app_password(
588 &self,
589 expected_user: &str,
590 password: AppPassword,
591 group_claim: Option<&str>,
592 ) -> Result<ValidatedClaims, Error> {
593 let req = ApiTokenArgs {
594 client_id: password.client_id,
595 secret: password.secret_key,
596 };
597 let res = self
598 .client
599 .exchange_client_secret_for_token(req, &self.admin_api_token_url, &self.metrics)
600 .await?;
601 self.validate_access_token(&res.access_token, Some(expected_user), group_claim)
602 }
603
604 fn validate_access_token(
605 &self,
606 token: &str,
607 expected_user: Option<&str>,
608 group_claim: Option<&str>,
609 ) -> Result<ValidatedClaims, Error> {
610 let msg = jsonwebtoken::decode::<Claims>(token, &self.decoding_key, &self.validation)?;
611 if msg.claims.exp < self.now.as_secs() {
612 return Err(Error::TokenExpired);
613 }
614 if let Some(expected_tenant_id) = self.tenant_id {
615 if msg.claims.tenant_id != expected_tenant_id {
616 return Err(Error::UnauthorizedTenant);
617 }
618 }
619
620 let user = msg.claims.user()?;
621
622 if let Some(expected_user) = expected_user {
623 validate_user(user, expected_user)?;
624 }
625
626 // Resolve the configured group claim path against the JWT for
627 // group-to-role sync. Skip extraction entirely if the caller doesn't
628 // need groups (e.g. balancerd, which only routes by tenant_id).
629 let groups = group_claim.and_then(|p| msg.claims.groups(p));
630
631 Ok(ValidatedClaims {
632 exp: msg.claims.exp,
633 user: user.to_string(),
634 user_id: msg.claims.user_id()?,
635 tenant_id: msg.claims.tenant_id,
636 // The user is an administrator if they have the admin role that the
637 // `Authenticator` has been configured with.
638 is_admin: msg.claims.roles.contains(&self.admin_role),
639 groups,
640 _private: (),
641 })
642 }
643
644 /// Records an [`AuthSessionHandle`] that was recently dropped.
645 fn record_dropped_session(&self, app_password: AppPassword) {
646 let now = Instant::now();
647 let Ok(mut dropped_sessions) = self.dropped_sessions.lock() else {
648 return;
649 };
650 dropped_sessions.push(app_password, now);
651 }
652
653 /// Returns the instant that an [`AuthSessionHandle`] created with the provided [`AppPassword`]
654 /// was last dropped.
655 fn last_dropped_session(&self, app_password: &AppPassword) -> Option<Instant> {
656 let Ok(dropped_sessions) = self.dropped_sessions.lock() else {
657 return None;
658 };
659 dropped_sessions.peek(app_password).copied()
660 }
661}
662
663type AuthFuture = dyn Future<Output = Result<AuthSessionHandle, Error>> + Send;
664
665#[derive(Derivative)]
666#[derivative(Debug)]
667enum AuthSession {
668 Pending(Shared<Pin<Box<AuthFuture>>>),
669 Active {
670 ident: Arc<AuthSessionIdent>,
671 external_metadata_tx: Arc<watch::Sender<ExternalUserMetadata>>,
672 /// Groups extracted from the most recent JWT's group claim. Updated
673 /// on every token refresh by the background task, so cached sessions
674 /// see bounded-staleness group membership changes (the staleness
675 /// window is the token refresh cadence).
676 groups_tx: Arc<watch::Sender<Option<Vec<String>>>>,
677 },
678}
679
680#[derive(Debug)]
681struct AuthSessionIdent {
682 user: String,
683 tenant_id: Uuid,
684}
685
686/// The type of a JWT issued by Frontegg.
687#[derive(Clone, Debug, Serialize, Deserialize)]
688#[serde(rename_all = "camelCase")]
689pub enum ClaimTokenType {
690 /// A user token.
691 ///
692 /// This type of token is issued when logging in via username and password
693 /// This does *not* include app passwords--those are API tokens under the
694 /// hood. This type of token is typically only used by the Materialize
695 /// console, as it requires SSO.
696 UserToken,
697 /// A user API token.
698 UserApiToken,
699 /// A tenant API token.
700 TenantApiToken,
701}
702
703/// Metadata embedded in a Frontegg JWT.
704#[derive(Clone, Debug, Serialize, Deserialize)]
705#[serde(rename_all = "camelCase")]
706pub struct ClaimMetadata {
707 /// The user name to use, for tokens of type `TenantApiToken`.
708 pub user: Option<String>,
709}
710
711/// The raw claims encoded in a Frontegg access token.
712///
713/// Consult the JSON Web Token specification and the Frontegg documentation to
714/// determine the precise semantics of these fields.
715#[derive(Clone, Debug, Serialize, Deserialize)]
716#[serde(rename_all = "camelCase")]
717pub struct Claims {
718 /// The "subject" of the token.
719 ///
720 /// For tokens of type `UserToken`, this is the ID of the Frontegg user
721 /// itself. For tokens of type `UserApiToken` and `TenantApiToken`, this
722 /// is the client ID of the API token.
723 pub sub: Uuid,
724 /// The time at which the claims expire, represented in seconds since the
725 /// Unix epoch.
726 pub exp: i64,
727 /// The "issuer" of the token.
728 ///
729 /// This is always the domain associated with the Frontegg workspace.
730 pub iss: String,
731 /// The type of API token.
732 #[serde(rename = "type")]
733 pub token_type: ClaimTokenType,
734 /// For tokens of type `UserToken` and `UserApiToken`, the email address
735 /// of the authenticated user.
736 pub email: Option<String>,
737 /// For tokens of type `UserApiToken`, the ID of the authenticated user.
738 pub user_id: Option<Uuid>,
739 /// The ID of the authenticated tenant.
740 pub tenant_id: Uuid,
741 /// The IDs of the roles granted by the token.
742 pub roles: Vec<String>,
743 /// The IDs of the permissions granted by the token.
744 pub permissions: Vec<String>,
745 /// Metadata embedded in the JWT.
746 pub metadata: Option<ClaimMetadata>,
747 /// JWT claims that aren't captured by the strongly-typed fields above.
748 /// The dyncfg-configurable `GROUP_CLAIM` path is resolved against this
749 /// bag (e.g. the default `groups` claim is read from here via
750 /// [`Claims::groups`]).
751 #[serde(flatten)]
752 pub unknown_claims: BTreeMap<String, serde_json::Value>,
753}
754
755impl Claims {
756 /// Returns the name of the user associated with the token.
757 pub fn user(&self) -> Result<&str, Error> {
758 match self.token_type {
759 // Use the email as the username for user tokens.
760 ClaimTokenType::UserToken | ClaimTokenType::UserApiToken => {
761 self.email.as_deref().ok_or(Error::MissingClaims)
762 }
763 // The user associated with a tenant API token is configured when
764 // the token is created and passed in the `metadata.user` claim.
765 ClaimTokenType::TenantApiToken => {
766 let user = self
767 .metadata
768 .as_ref()
769 .and_then(|m| m.user.as_deref())
770 .ok_or(Error::MissingClaims)?;
771 if is_email(user) {
772 return Err(Error::InvalidTenantApiTokenUser);
773 }
774 Ok(user)
775 }
776 }
777 }
778
779 /// Returns the ID of the user associated with the token.
780 pub fn user_id(&self) -> Result<Uuid, Error> {
781 match self.token_type {
782 // The `sub` claim stores the ID of the user.
783 ClaimTokenType::UserToken => Ok(self.sub),
784 // Unlike user tokens, the `sub` claim stores the client ID of the
785 // API token. The user ID is passed in the dedicated `user_id`
786 // claim.
787 ClaimTokenType::UserApiToken => self.user_id.ok_or(Error::MissingClaims),
788 // The best user ID for a tenant API token is the client ID of the
789 // tenant API token, as the tokens are not associated with a
790 // Frontegg user.
791 ClaimTokenType::TenantApiToken => Ok(self.sub),
792 }
793 }
794
795 /// Returns the user's group memberships extracted from the JWT claim at
796 /// `claim_path` for group-to-role sync. See
797 /// [`mz_auth::group_claims::extract_groups`] for semantics.
798 pub fn groups(&self, claim_path: &str) -> Option<Vec<String>> {
799 mz_auth::group_claims::extract_groups(&self.unknown_claims, claim_path)
800 }
801}
802
803/// [`Claims`] that have been validated by
804/// [`Authenticator::validate_access_token`].
805#[derive(Clone, Debug)]
806pub struct ValidatedClaims {
807 /// The time at which the claims expire, represented in seconds since the
808 /// Unix epoch.
809 pub exp: i64,
810 /// The ID of the authenticated user.
811 pub user_id: Uuid,
812 /// The name of the authenticated user.
813 ///
814 /// For tokens of type `UserToken` or `UserApiToken`, this is the email
815 /// address of the authenticated user. For tokens of type `TenantApiToken`,
816 /// this is the `serviceUser` field in the token's metadata.
817 pub user: String,
818 /// The ID of the tenant the user is authenticated for.
819 pub tenant_id: Uuid,
820 /// Whether the authenticated user is an administrator.
821 pub is_admin: bool,
822 /// Groups extracted from the configured JWT group claim. `None` if the
823 /// claim is absent (skip sync); `Some([])` if present but empty (revoke
824 /// all sync-granted roles); `Some([...])` otherwise.
825 pub groups: Option<Vec<String>>,
826 // Prevent construction outside of `Authenticator::validate_access_token`.
827 _private: (),
828}
829
830impl ValidatedClaims {
831 /// Constructs an [`ExternalUserMetadata`] from the claims data.
832 fn to_external_user_metadata(&self) -> ExternalUserMetadata {
833 ExternalUserMetadata {
834 admin: self.is_admin,
835 user_id: self.user_id,
836 }
837 }
838}
839
840/// Reports whether a username is an email address.
841fn is_email(user: &str) -> bool {
842 // We don't need a sophisticated test here. We need a test that will return
843 // `true` for anything that can possibly be an email address, while also
844 // returning `false` for a large class of strings that can be used as names
845 // for service users.
846 //
847 // Checking for `@` balances the concerns. Every email address MUST have an
848 // `@` character. Disallowing `@` characters in service user names is an
849 // acceptable restriction.
850 user.contains('@')
851}
852
853fn validate_user(user: &str, expected_user: &str) -> Result<(), Error> {
854 // Impose a maximum length on user names for sanity.
855 if user.len() > MAX_USER_NAME_LENGTH {
856 return Err(Error::UserNameTooLong);
857 }
858
859 let valid = match is_email(expected_user) {
860 false => user == expected_user,
861 // To match Frontegg, email addresses are compared case insensitively.
862 //
863 // NOTE(benesch): we could save some allocations by using `unicase::eq`
864 // here, but the `unicase` crate has had some critical correctness bugs that
865 // make it scary to use in such security-sensitive code.
866 //
867 // See: https://github.com/seanmonstar/unicase/pull/39
868 true => user.to_lowercase() == expected_user.to_lowercase(),
869 };
870 match valid {
871 false => Err(Error::WrongUser),
872 true => Ok(()),
873 }
874}
875
876const fn bool_as_str(x: bool) -> &'static str {
877 if x { "true" } else { "false" }
878}
879
880#[cfg(test)]
881mod tests {
882 use super::*;
883 use serde_json::json;
884
885 fn claims_with(unknown: serde_json::Value) -> Claims {
886 let mut claims = Claims {
887 exp: 0,
888 email: None,
889 iss: String::new(),
890 sub: Uuid::nil(),
891 user_id: None,
892 tenant_id: Uuid::nil(),
893 roles: Vec::new(),
894 permissions: Vec::new(),
895 token_type: ClaimTokenType::UserToken,
896 metadata: None,
897 unknown_claims: BTreeMap::new(),
898 };
899 if let serde_json::Value::Object(map) = unknown {
900 for (k, v) in map {
901 claims.unknown_claims.insert(k, v);
902 }
903 }
904 claims
905 }
906
907 #[mz_ore::test]
908 fn groups_absent_returns_none() {
909 let claims = claims_with(json!({}));
910 assert_eq!(claims.groups("groups"), None);
911 }
912
913 #[mz_ore::test]
914 fn groups_empty_returns_some_empty() {
915 let claims = claims_with(json!({"groups": []}));
916 assert_eq!(claims.groups("groups"), Some(vec![]));
917 }
918
919 #[mz_ore::test]
920 fn groups_populated_returns_some_values() {
921 let claims = claims_with(json!({"groups": ["analytics", "engineering"]}));
922 assert_eq!(
923 claims.groups("groups"),
924 Some(vec!["analytics".to_string(), "engineering".to_string()])
925 );
926 }
927}