Skip to main content

mz_frontegg_auth/
auth.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::future::Future;
12use std::num::NonZeroUsize;
13use std::pin::Pin;
14use std::sync::{Arc, Mutex};
15use std::time::{Duration, Instant};
16
17use anyhow::Context as _;
18use derivative::Derivative;
19use futures::FutureExt;
20use futures::future::Shared;
21use jsonwebtoken::{Algorithm, DecodingKey, Validation};
22use lru::LruCache;
23use mz_auth::Authenticated;
24use mz_ore::instrument;
25use mz_ore::metrics::MetricsRegistry;
26use mz_ore::now::NowFn;
27use mz_ore::time::DurationExt;
28use mz_repr::user::ExternalUserMetadata;
29use serde::{Deserialize, Serialize};
30use tokio::sync::watch;
31use tokio::time;
32use uuid::Uuid;
33
34use crate::metrics::Metrics;
35use crate::{ApiTokenArgs, AppPassword, Client, Error, FronteggCliArgs};
36
37pub const DEFAULT_REFRESH_DROP_LRU_CACHE_SIZE: NonZeroUsize =
38    NonZeroUsize::new(1024).expect("1024 is non-zero");
39
40/// If a session is dropped within [`DEFAULT_REFRESH_DROP_FACTOR`] `* valid_for` seconds of an
41/// authentication token expiring, then we'll continue to refresh the auth token, with the
42/// assumption that a new instance of this session will be started soon.
43pub const DEFAULT_REFRESH_DROP_FACTOR: f64 = 0.05;
44
45/// The maximum length of a user name.
46pub const MAX_USER_NAME_LENGTH: usize = 255;
47
48/// Configures an [`Authenticator`].
49#[derive(Clone, Derivative)]
50#[derivative(Debug)]
51pub struct AuthenticatorConfig {
52    /// URL for the token endpoint, including full path.
53    pub admin_api_token_url: String,
54    /// JWK used to validate JWTs.
55    #[derivative(Debug = "ignore")]
56    pub decoding_key: DecodingKey,
57    /// Optional tenant id used to validate JWTs.
58    pub tenant_id: Option<Uuid>,
59    /// Function to provide system time to validate exp (expires at) field of JWTs.
60    pub now: NowFn,
61    /// Name of admin role.
62    pub admin_role: String,
63    /// How many [`AppPassword`]s we'll track the last dropped time for.
64    ///
65    /// TODO(parkmycar): Wire this up to LaunchDarkly.
66    pub refresh_drop_lru_size: NonZeroUsize,
67    /// How large of a window we'll use for determining if a session was dropped "recently", and if
68    /// we should refresh the session, even if there are not any active handles to it.
69    ///
70    /// TODO(parkmycar): Wire this up to LaunchDarkly.
71    pub refresh_drop_factor: f64,
72}
73
74/// Facilitates authenticating users via Frontegg, and verifying returned JWTs.
75#[derive(Clone, Debug)]
76pub struct Authenticator {
77    inner: Arc<AuthenticatorInner>,
78}
79
80impl Authenticator {
81    /// Creates a new authenticator.
82    pub fn new(config: AuthenticatorConfig, client: Client, registry: &MetricsRegistry) -> Self {
83        let mut validation = Validation::new(Algorithm::RS256);
84
85        // We validate the token expiration with our own now function.
86        validation.validate_exp = false;
87
88        // We don't validate the audience because:
89        //
90        //   1. We don't have easy access to the expected audience ID here.
91        //
92        //   2. There is no meaningful security improvement to doing so, because
93        //      Frontegg always sets the audience to the ID of the workspace
94        //      that issued the token. Since we only trust the signing keys from
95        //      a single Frontegg workspace, the audience is redundant.
96        //
97        // See this conversation [0] from the Materialize–Frontegg shared Slack
98        // channel on 1 January 2024.
99        //
100        // NOTE we do validate that the tenantId claim matches the expected tenant_id
101        // in order to ensure that the JWT was created for the specific environment.
102        //
103        // [0]: https://materializeinc.slack.com/archives/C02940WNMRQ/p1704131331041669
104        validation.validate_aud = false;
105
106        let metrics = Metrics::register_into(registry);
107        let active_sessions = Mutex::new(BTreeMap::new());
108        let dropped_sessions = Mutex::new(LruCache::new(config.refresh_drop_lru_size));
109
110        Authenticator {
111            inner: Arc::new(AuthenticatorInner {
112                admin_api_token_url: config.admin_api_token_url,
113                client,
114                validation,
115                decoding_key: config.decoding_key,
116                tenant_id: config.tenant_id,
117                admin_role: config.admin_role,
118                now: config.now,
119                active_sessions,
120                dropped_sessions,
121                refresh_drop_factor: config.refresh_drop_factor,
122                metrics,
123            }),
124        }
125    }
126
127    /// Create an [`Authenticator`] from [`FronteggCliArgs`].
128    pub fn from_args(
129        args: FronteggCliArgs,
130        registry: &MetricsRegistry,
131    ) -> Result<Option<Self>, Error> {
132        let config = match (
133            args.frontegg_tenant,
134            args.frontegg_api_token_url,
135            args.frontegg_admin_role,
136        ) {
137            (None, None, None) => {
138                return Ok(None);
139            }
140            (Some(tenant_id), Some(admin_api_token_url), Some(admin_role)) => {
141                let decoding_key = match (args.frontegg_jwk, args.frontegg_jwk_file) {
142                    (None, Some(path)) => {
143                        let jwk = std::fs::read(&path)
144                            .with_context(|| format!("reading {path:?} for --frontegg-jwk-file"))?;
145                        DecodingKey::from_rsa_pem(&jwk)?
146                    }
147                    (Some(jwk), None) => DecodingKey::from_rsa_pem(jwk.as_bytes())?,
148                    _ => {
149                        return Err(anyhow::anyhow!(
150                            "expected exactly one of --frontegg-jwk or --frontegg-jwk-file"
151                        )
152                        .into());
153                    }
154                };
155                AuthenticatorConfig {
156                    admin_api_token_url,
157                    decoding_key,
158                    tenant_id: Some(tenant_id),
159                    now: mz_ore::now::SYSTEM_TIME.clone(),
160                    admin_role,
161                    refresh_drop_lru_size: DEFAULT_REFRESH_DROP_LRU_CACHE_SIZE,
162                    refresh_drop_factor: DEFAULT_REFRESH_DROP_FACTOR,
163                }
164            }
165            _ => unreachable!("clap enforced"),
166        };
167        let client = Client::environmentd_default();
168
169        Ok(Some(Self::new(config, client, registry)))
170    }
171
172    /// Establishes a new authentication session.
173    ///
174    /// If successful, returns a handle to the authentication session.
175    /// Otherwise, returns the authentication error.
176    pub async fn authenticate(
177        &self,
178        expected_user: &str,
179        password: &str,
180        group_claim: Option<&str>,
181    ) -> Result<(AuthSessionHandle, Authenticated), Error> {
182        let password: AppPassword = password.parse()?;
183        match self
184            .authenticate_inner(expected_user, password, group_claim)
185            .await
186        {
187            Ok(handle) => {
188                tracing::debug!("authentication successful");
189                Ok((handle, Authenticated))
190            }
191            Err(e) => {
192                tracing::debug!(error = ?e, "authentication failed");
193                Err(e)
194            }
195        }
196    }
197
198    #[instrument(level = "debug", fields(client_id = %password.client_id))]
199    async fn authenticate_inner(
200        &self,
201        expected_user: &str,
202        password: AppPassword,
203        group_claim: Option<&str>,
204    ) -> Result<AuthSessionHandle, Error> {
205        let request = {
206            let mut sessions = self.inner.active_sessions.lock().expect("lock poisoned");
207            match sessions.get_mut(&password) {
208                // We have an existing session for this app password.
209                Some(AuthSession::Active {
210                    ident,
211                    external_metadata_tx,
212                    groups_tx,
213                }) => {
214                    tracing::debug!(?password.client_id, "joining active session");
215
216                    validate_user(&ident.user, expected_user)?;
217                    self.inner
218                        .metrics
219                        .session_request_count
220                        .with_label_values(&["active"])
221                        .inc();
222
223                    // Return a handle to the existing session.
224                    return Ok(AuthSessionHandle {
225                        ident: Arc::clone(ident),
226                        external_metadata_rx: external_metadata_tx.subscribe(),
227                        groups_rx: groups_tx.subscribe(),
228                        authenticator: Arc::clone(&self.inner),
229                        app_password: password,
230                    });
231                }
232
233                // We have an in flight request to establish a session.
234                Some(AuthSession::Pending(request)) => {
235                    // Latch on to the existing session.
236                    tracing::debug!(?password.client_id, "joining pending session");
237                    self.inner
238                        .metrics
239                        .session_request_count
240                        .with_label_values(&["pending"])
241                        .inc();
242                    request.clone()
243                }
244
245                // We do not have an existing session for this API key.
246                None => {
247                    tracing::debug!(?password.client_id, "starting new session");
248
249                    // Prepare the request to create a new session.
250                    let request: Pin<Box<AuthFuture>> = Box::pin({
251                        let inner = Arc::clone(&self.inner);
252                        let expected_user = String::from(expected_user);
253                        let group_claim = group_claim.map(String::from);
254                        async move {
255                            let result = inner
256                                .authenticate(expected_user, password, group_claim)
257                                .await;
258
259                            // Make sure our AuthSession state is correct.
260                            //
261                            // Note: We're quite defensive here because this has been a source of
262                            // bugs in the past.
263                            let mut sessions = inner.active_sessions.lock().expect("lock poisoned");
264                            if let Err(err) = &result {
265                                let session = sessions.remove(&password);
266                                tracing::debug!(?err, ?session, "removing failed auth session");
267                            } else {
268                                // If the request succeeds, make sure our state is what we expect.
269                                match sessions.get(&password) {
270                                    // Expected State.
271                                    Some(AuthSession::Active { .. }) => (),
272                                    // Invalid! The AuthSession should have become Active.
273                                    None | Some(AuthSession::Pending(_)) => {
274                                        tracing::error!(
275                                            ?password.client_id,
276                                            "failed to make auth session active!"
277                                        );
278                                        sessions.remove(&password);
279                                    }
280                                }
281                            }
282
283                            result
284                        }
285                    });
286
287                    // Store the future so that future requests can latch on.
288                    let request = request.shared();
289                    sessions.insert(password, AuthSession::Pending(request.clone()));
290                    self.inner
291                        .metrics
292                        .session_request_count
293                        .with_label_values(&["new"])
294                        .inc();
295
296                    // Make sure there is always something driving the request to completion
297                    // incase the client goes away.
298                    mz_ore::task::spawn(|| "auth-session-listener", {
299                        let request = request.clone();
300                        async move {
301                            // We don't care about the result here, someone else handles it.
302                            let _ = request.await;
303                        }
304                    });
305
306                    // Wait for the request to complete.
307                    request
308                }
309            }
310        };
311        request.await
312    }
313
314    /// Validates an access token, returning the validated claims.
315    ///
316    /// The following validations are always performed:
317    ///
318    ///   * The token is not expired, according to the `Authentication`'s clock.
319    ///
320    ///   * The tenant ID in the token matches the `Authentication`'s tenant ID.
321    ///
322    /// If `expected_user` is provided, the token's user name is additionally
323    /// validated to match `expected_user`.
324    pub fn validate_access_token(
325        &self,
326        token: &str,
327        expected_user: Option<&str>,
328        group_claim: Option<&str>,
329    ) -> Result<(ValidatedClaims, Authenticated), Error> {
330        let claims = self
331            .inner
332            .validate_access_token(token, expected_user, group_claim)?;
333        Ok((claims, Authenticated))
334    }
335}
336
337/// A handle to an authentication session.
338///
339/// An authentication session represents a duration of time during which a
340/// user's authentication is known to be valid.
341///
342/// An authentication session begins with a successful API key exchange with
343/// Frontegg. While there is at least one outstanding handle to the session, the
344/// session's metadata and validity are refreshed with Frontegg at a regular
345/// interval. The session ends when all outstanding handles are dropped and the
346/// refresh interval is reached.
347///
348/// [`AuthSessionHandle::external_metadata_rx`] can be used to receive events if
349/// the session's metadata is updated.
350///
351/// [`AuthSessionHandle::expired`] can be used to learn if the session has
352/// failed to refresh the validity of the API key.
353#[derive(Debug, Clone)]
354pub struct AuthSessionHandle {
355    ident: Arc<AuthSessionIdent>,
356    external_metadata_rx: watch::Receiver<ExternalUserMetadata>,
357    /// Receiver for group memberships, updated on every token refresh so
358    /// callers reading via [`AuthSessionHandle::groups`] see at-most-one-refresh
359    /// stale data even when joining a long-lived cached session.
360    groups_rx: watch::Receiver<Option<Vec<String>>>,
361    /// Hold a handle to the [`AuthenticatorInner`] so we can record when this session was dropped.
362    authenticator: Arc<AuthenticatorInner>,
363    /// Used to record when the session linked with this [`AppPassword`] was dropped.
364    app_password: AppPassword,
365}
366
367impl AuthSessionHandle {
368    /// Returns the name of the user that created the session.
369    pub fn user(&self) -> &str {
370        &self.ident.user
371    }
372
373    /// Returns the ID of the tenant that created the session.
374    pub fn tenant_id(&self) -> Uuid {
375        self.ident.tenant_id
376    }
377
378    /// Returns the groups extracted from the most recently refreshed JWT's
379    /// group claim, used for JWT group-to-role sync. `None` if the claim was
380    /// absent; `Some(vec![])` if present but empty; `Some(vec![...])`
381    /// otherwise. Refreshed by the background task on every token renewal so
382    /// cached sessions observe bounded-staleness group membership changes.
383    pub fn groups(&self) -> Option<Vec<String>> {
384        self.groups_rx.borrow().clone()
385    }
386
387    /// Mints a receiver for updates to the session user's external metadata.
388    pub fn external_metadata_rx(&self) -> watch::Receiver<ExternalUserMetadata> {
389        self.external_metadata_rx.clone()
390    }
391
392    /// Completes when the authentication session has expired.
393    pub async fn expired(&mut self) {
394        // We piggyback on the external metadata channel to determine session
395        // expiration. The external metadata channel is closed when the session
396        // expires.
397        let _ = self.external_metadata_rx.wait_for(|_| false).await;
398    }
399}
400
401impl Drop for AuthSessionHandle {
402    fn drop(&mut self) {
403        self.authenticator.record_dropped_session(self.app_password);
404    }
405}
406
407#[derive(Derivative)]
408#[derivative(Debug)]
409struct AuthenticatorInner {
410    /// Frontegg API fields.
411    admin_api_token_url: String,
412    client: Client,
413    /// JWT decoding and validation fields.
414    validation: Validation,
415    #[derivative(Debug = "ignore")]
416    decoding_key: DecodingKey,
417    tenant_id: Option<Uuid>,
418    admin_role: String,
419    now: NowFn,
420    /// Session tracking.
421    active_sessions: Mutex<BTreeMap<AppPassword, AuthSession>>,
422    /// Most recent time at which a session created with an [`AppPassword`] was dropped.
423    ///
424    /// We track when a session was dropped to handle the case of many one-shot queries being
425    /// issued in rapid succession. If it comes time to refresh an auth token, and there are no
426    /// currently alive sessions, but one was recently dropped, we'll pre-emptively refresh to get
427    /// ahead of another session being created with the same [`AppPassword`].
428    dropped_sessions: Mutex<LruCache<AppPassword, Instant>>,
429    /// How large of a window we'll use for determining if a session was dropped "recently", and if
430    /// we should refresh the session, even if there are not any active handles to it.
431    refresh_drop_factor: f64,
432    /// Metrics.
433    metrics: Metrics,
434}
435
436impl AuthenticatorInner {
437    async fn authenticate(
438        self: &Arc<Self>,
439        expected_user: String,
440        password: AppPassword,
441        group_claim: Option<String>,
442    ) -> Result<AuthSessionHandle, Error> {
443        // Attempt initial app password exchange.
444        let mut claims = self
445            .exchange_app_password(&expected_user, password, group_claim.as_deref())
446            .await?;
447
448        // Prep session information.
449        let ident = Arc::new(AuthSessionIdent {
450            user: claims.user.clone(),
451            tenant_id: claims.tenant_id,
452        });
453        let external_metadata = claims.to_external_user_metadata();
454        let (external_metadata_tx, external_metadata_rx) = watch::channel(external_metadata);
455        let external_metadata_tx = Arc::new(external_metadata_tx);
456        let (groups_tx, groups_rx) = watch::channel(claims.groups.clone());
457        let groups_tx = Arc::new(groups_tx);
458
459        // Store session to make it available for future requests to latch on
460        // to.
461        {
462            let mut sessions = self.active_sessions.lock().expect("lock poisoned");
463            sessions.insert(
464                password,
465                AuthSession::Active {
466                    ident: Arc::clone(&ident),
467                    external_metadata_tx: Arc::clone(&external_metadata_tx),
468                    groups_tx: Arc::clone(&groups_tx),
469                },
470            );
471        }
472
473        // Start background refresh task.
474        let name = format!("frontegg-auth-refresh-{}", password.client_id);
475        mz_ore::task::spawn(|| name, {
476            let inner = Arc::clone(self);
477            async move {
478                tracing::debug!(?password.client_id, "starting refresh task");
479                inner.metrics.refresh_tasks_active.inc();
480
481                loop {
482                    let valid_for = Duration::try_from_secs_i64(claims.exp - inner.now.as_secs())
483                        .unwrap_or(Duration::from_secs(60));
484
485                    // If we have no outstanding handling to this session, but a handle was dropped
486                    // within this window, then we'll still refresh.
487                    let drop_window = valid_for
488                        .saturating_mul_f64(inner.refresh_drop_factor)
489                        .max(Duration::from_secs(1));
490                    // Scale the validity duration by 0.8. The Frontegg Python
491                    // SDK scales the expires_in this way.
492                    //
493                    // <https://github.com/frontegg/python-sdk/blob/840f8318aced35cea6a41d83270597edfceb4019/frontegg/common/frontegg_authenticator.py#L45>
494                    let valid_for = valid_for.saturating_mul_f64(0.8);
495
496                    if valid_for < Duration::from_secs(60) {
497                        tracing::warn!(?valid_for, "unexpectedly low token validity");
498                    }
499
500                    tracing::debug!(
501                        ?valid_for,
502                        ?drop_window,
503                        "waiting for token validity period"
504                    );
505
506                    // Wait out validity duration.
507                    time::sleep(valid_for).await;
508
509                    // Check to see if all external metadata receivers have gone away, or if a
510                    // session created with this password was recently dropped. If no one is
511                    // listening nor any recent handles were dropped we can clean up the session.
512                    let receiver_count = external_metadata_tx.receiver_count();
513                    let last_drop = inner.last_dropped_session(&password);
514                    let recent_drop = last_drop
515                        .map(|dropped_at| dropped_at.elapsed() <= drop_window)
516                        .unwrap_or(false);
517                    if receiver_count == 0 && !recent_drop {
518                        tracing::debug!(
519                            ?last_drop,
520                            ?password.client_id,
521                            "all listeners have dropped and none of them were recent!"
522                        );
523                        break;
524                    }
525
526                    let outstanding_receivers = bool_as_str(receiver_count > 0);
527                    inner
528                        .metrics
529                        .session_refresh_count
530                        .with_label_values(&[outstanding_receivers, bool_as_str(recent_drop)])
531                        .inc();
532                    tracing::debug!(
533                        receiver_count,
534                        ?last_drop,
535                        ?password.client_id,
536                        "refreshing due to interest in the session"
537                    );
538
539                    // We still have interest, attempt to refresh the session.
540                    let res = inner
541                        .exchange_app_password(&expected_user, password, group_claim.as_deref())
542                        .await;
543                    claims = match res {
544                        Ok(claims) => {
545                            tracing::debug!("refresh successful");
546                            claims
547                        }
548                        Err(e) => {
549                            tracing::warn!(error = ?e, "refresh failed");
550                            break;
551                        }
552                    };
553                    external_metadata_tx.send_replace(ExternalUserMetadata {
554                        admin: claims.is_admin,
555                        user_id: claims.user_id,
556                    });
557                    groups_tx.send_replace(claims.groups.clone());
558                }
559
560                // The session has expired. Clean up the state.
561                {
562                    let mut sessions = inner.active_sessions.lock().expect("lock poisoned");
563                    sessions.remove(&password);
564                }
565                {
566                    let mut dropped_session = inner.dropped_sessions.lock().expect("lock poisoned");
567                    dropped_session.pop(&password);
568                }
569
570                tracing::debug!(?password.client_id, "shutting down refresh task");
571                inner.metrics.refresh_tasks_active.dec();
572            }
573        });
574
575        // Return handle to session.
576        Ok(AuthSessionHandle {
577            ident,
578            external_metadata_rx,
579            groups_rx,
580            authenticator: Arc::clone(self),
581            app_password: password,
582        })
583    }
584
585    #[instrument]
586    async fn exchange_app_password(
587        &self,
588        expected_user: &str,
589        password: AppPassword,
590        group_claim: Option<&str>,
591    ) -> Result<ValidatedClaims, Error> {
592        let req = ApiTokenArgs {
593            client_id: password.client_id,
594            secret: password.secret_key,
595        };
596        let res = self
597            .client
598            .exchange_client_secret_for_token(req, &self.admin_api_token_url, &self.metrics)
599            .await?;
600        self.validate_access_token(&res.access_token, Some(expected_user), group_claim)
601    }
602
603    fn validate_access_token(
604        &self,
605        token: &str,
606        expected_user: Option<&str>,
607        group_claim: Option<&str>,
608    ) -> Result<ValidatedClaims, Error> {
609        let msg = jsonwebtoken::decode::<Claims>(token, &self.decoding_key, &self.validation)?;
610        if msg.claims.exp < self.now.as_secs() {
611            return Err(Error::TokenExpired);
612        }
613        if let Some(expected_tenant_id) = self.tenant_id {
614            if msg.claims.tenant_id != expected_tenant_id {
615                return Err(Error::UnauthorizedTenant);
616            }
617        }
618
619        let user = msg.claims.user()?;
620
621        if let Some(expected_user) = expected_user {
622            validate_user(user, expected_user)?;
623        }
624
625        // Resolve the configured group claim path against the JWT for
626        // group-to-role sync. Skip extraction entirely if the caller doesn't
627        // need groups (e.g. balancerd, which only routes by tenant_id).
628        let groups = group_claim.and_then(|p| msg.claims.groups(p));
629
630        Ok(ValidatedClaims {
631            exp: msg.claims.exp,
632            user: user.to_string(),
633            user_id: msg.claims.user_id()?,
634            tenant_id: msg.claims.tenant_id,
635            // The user is an administrator if they have the admin role that the
636            // `Authenticator` has been configured with.
637            is_admin: msg.claims.roles.contains(&self.admin_role),
638            groups,
639            _private: (),
640        })
641    }
642
643    /// Records an [`AuthSessionHandle`] that was recently dropped.
644    fn record_dropped_session(&self, app_password: AppPassword) {
645        let now = Instant::now();
646        let Ok(mut dropped_sessions) = self.dropped_sessions.lock() else {
647            return;
648        };
649        dropped_sessions.push(app_password, now);
650    }
651
652    /// Returns the instant that an [`AuthSessionHandle`] created with the provided [`AppPassword`]
653    /// was last dropped.
654    fn last_dropped_session(&self, app_password: &AppPassword) -> Option<Instant> {
655        let Ok(dropped_sessions) = self.dropped_sessions.lock() else {
656            return None;
657        };
658        dropped_sessions.peek(app_password).copied()
659    }
660}
661
662type AuthFuture = dyn Future<Output = Result<AuthSessionHandle, Error>> + Send;
663
664#[derive(Derivative)]
665#[derivative(Debug)]
666enum AuthSession {
667    Pending(Shared<Pin<Box<AuthFuture>>>),
668    Active {
669        ident: Arc<AuthSessionIdent>,
670        external_metadata_tx: Arc<watch::Sender<ExternalUserMetadata>>,
671        /// Groups extracted from the most recent JWT's group claim. Updated
672        /// on every token refresh by the background task, so cached sessions
673        /// see bounded-staleness group membership changes (the staleness
674        /// window is the token refresh cadence).
675        groups_tx: Arc<watch::Sender<Option<Vec<String>>>>,
676    },
677}
678
679#[derive(Debug)]
680struct AuthSessionIdent {
681    user: String,
682    tenant_id: Uuid,
683}
684
685/// The type of a JWT issued by Frontegg.
686#[derive(Clone, Debug, Serialize, Deserialize)]
687#[serde(rename_all = "camelCase")]
688pub enum ClaimTokenType {
689    /// A user token.
690    ///
691    /// This type of token is issued when logging in via username and password
692    /// This does *not* include app passwords--those are API tokens under the
693    /// hood. This type of token is typically only used by the Materialize
694    /// console, as it requires SSO.
695    UserToken,
696    /// A user API token.
697    UserApiToken,
698    /// A tenant API token.
699    TenantApiToken,
700}
701
702/// Metadata embedded in a Frontegg JWT.
703#[derive(Clone, Debug, Serialize, Deserialize)]
704#[serde(rename_all = "camelCase")]
705pub struct ClaimMetadata {
706    /// The user name to use, for tokens of type `TenantApiToken`.
707    pub user: Option<String>,
708}
709
710/// The raw claims encoded in a Frontegg access token.
711///
712/// Consult the JSON Web Token specification and the Frontegg documentation to
713/// determine the precise semantics of these fields.
714#[derive(Clone, Debug, Serialize, Deserialize)]
715#[serde(rename_all = "camelCase")]
716pub struct Claims {
717    /// The "subject" of the token.
718    ///
719    /// For tokens of type `UserToken`, this is the ID of the Frontegg user
720    /// itself. For tokens of type `UserApiToken` and `TenantApiToken`, this
721    /// is the client ID of the API token.
722    pub sub: Uuid,
723    /// The time at which the claims expire, represented in seconds since the
724    /// Unix epoch.
725    pub exp: i64,
726    /// The "issuer" of the token.
727    ///
728    /// This is always the domain associated with the Frontegg workspace.
729    pub iss: String,
730    /// The type of API token.
731    #[serde(rename = "type")]
732    pub token_type: ClaimTokenType,
733    /// For tokens of type `UserToken` and `UserApiToken`, the email address
734    /// of the authenticated user.
735    pub email: Option<String>,
736    /// For tokens of type `UserApiToken`, the ID of the authenticated user.
737    pub user_id: Option<Uuid>,
738    /// The ID of the authenticated tenant.
739    pub tenant_id: Uuid,
740    /// The IDs of the roles granted by the token.
741    pub roles: Vec<String>,
742    /// The IDs of the permissions granted by the token.
743    pub permissions: Vec<String>,
744    /// Metadata embedded in the JWT.
745    pub metadata: Option<ClaimMetadata>,
746    /// JWT claims that aren't captured by the strongly-typed fields above.
747    /// The dyncfg-configurable `GROUP_CLAIM` path is resolved against this
748    /// bag (e.g. the default `groups` claim is read from here via
749    /// [`Claims::groups`]).
750    #[serde(flatten)]
751    pub unknown_claims: BTreeMap<String, serde_json::Value>,
752}
753
754impl Claims {
755    /// Returns the name of the user associated with the token.
756    pub fn user(&self) -> Result<&str, Error> {
757        match self.token_type {
758            // Use the email as the username for user tokens.
759            ClaimTokenType::UserToken | ClaimTokenType::UserApiToken => {
760                self.email.as_deref().ok_or(Error::MissingClaims)
761            }
762            // The user associated with a tenant API token is configured when
763            // the token is created and passed in the `metadata.user` claim.
764            ClaimTokenType::TenantApiToken => {
765                let user = self
766                    .metadata
767                    .as_ref()
768                    .and_then(|m| m.user.as_deref())
769                    .ok_or(Error::MissingClaims)?;
770                if is_email(user) {
771                    return Err(Error::InvalidTenantApiTokenUser);
772                }
773                Ok(user)
774            }
775        }
776    }
777
778    /// Returns the ID of the user associated with the token.
779    pub fn user_id(&self) -> Result<Uuid, Error> {
780        match self.token_type {
781            // The `sub` claim stores the ID of the user.
782            ClaimTokenType::UserToken => Ok(self.sub),
783            // Unlike user tokens, the `sub` claim stores the client ID of the
784            // API token. The user ID is passed in the dedicated `user_id`
785            // claim.
786            ClaimTokenType::UserApiToken => self.user_id.ok_or(Error::MissingClaims),
787            // The best user ID for a tenant API token is the client ID of the
788            // tenant API token, as the tokens are not associated with a
789            // Frontegg user.
790            ClaimTokenType::TenantApiToken => Ok(self.sub),
791        }
792    }
793
794    /// Returns the user's group memberships extracted from the JWT claim at
795    /// `claim_path` for group-to-role sync. See
796    /// [`mz_auth::group_claims::extract_groups`] for semantics.
797    pub fn groups(&self, claim_path: &str) -> Option<Vec<String>> {
798        mz_auth::group_claims::extract_groups(&self.unknown_claims, claim_path)
799    }
800}
801
802/// [`Claims`] that have been validated by
803/// [`Authenticator::validate_access_token`].
804#[derive(Clone, Debug)]
805pub struct ValidatedClaims {
806    /// The time at which the claims expire, represented in seconds since the
807    /// Unix epoch.
808    pub exp: i64,
809    /// The ID of the authenticated user.
810    pub user_id: Uuid,
811    /// The name of the authenticated user.
812    ///
813    /// For tokens of type `UserToken` or `UserApiToken`, this is the email
814    /// address of the authenticated user. For tokens of type `TenantApiToken`,
815    /// this is the `serviceUser` field in the token's metadata.
816    pub user: String,
817    /// The ID of the tenant the user is authenticated for.
818    pub tenant_id: Uuid,
819    /// Whether the authenticated user is an administrator.
820    pub is_admin: bool,
821    /// Groups extracted from the configured JWT group claim. `None` if the
822    /// claim is absent (skip sync); `Some([])` if present but empty (revoke
823    /// all sync-granted roles); `Some([...])` otherwise.
824    pub groups: Option<Vec<String>>,
825    // Prevent construction outside of `Authenticator::validate_access_token`.
826    _private: (),
827}
828
829impl ValidatedClaims {
830    /// Constructs an [`ExternalUserMetadata`] from the claims data.
831    fn to_external_user_metadata(&self) -> ExternalUserMetadata {
832        ExternalUserMetadata {
833            admin: self.is_admin,
834            user_id: self.user_id,
835        }
836    }
837}
838
839/// Reports whether a username is an email address.
840fn is_email(user: &str) -> bool {
841    // We don't need a sophisticated test here. We need a test that will return
842    // `true` for anything that can possibly be an email address, while also
843    // returning `false` for a large class of strings that can be used as names
844    // for service users.
845    //
846    // Checking for `@` balances the concerns. Every email address MUST have an
847    // `@` character. Disallowing `@` characters in service user names is an
848    // acceptable restriction.
849    user.contains('@')
850}
851
852fn validate_user(user: &str, expected_user: &str) -> Result<(), Error> {
853    // Impose a maximum length on user names for sanity.
854    if user.len() > MAX_USER_NAME_LENGTH {
855        return Err(Error::UserNameTooLong);
856    }
857
858    let valid = match is_email(expected_user) {
859        false => user == expected_user,
860        // To match Frontegg, email addresses are compared case insensitively.
861        //
862        // NOTE(benesch): we could save some allocations by using `unicase::eq`
863        // here, but the `unicase` crate has had some critical correctness bugs that
864        // make it scary to use in such security-sensitive code.
865        //
866        // See: https://github.com/seanmonstar/unicase/pull/39
867        true => user.to_lowercase() == expected_user.to_lowercase(),
868    };
869    match valid {
870        false => Err(Error::WrongUser),
871        true => Ok(()),
872    }
873}
874
875const fn bool_as_str(x: bool) -> &'static str {
876    if x { "true" } else { "false" }
877}
878
879#[cfg(test)]
880mod tests {
881    use super::*;
882    use serde_json::json;
883
884    fn claims_with(unknown: serde_json::Value) -> Claims {
885        let mut claims = Claims {
886            exp: 0,
887            email: None,
888            iss: String::new(),
889            sub: Uuid::nil(),
890            user_id: None,
891            tenant_id: Uuid::nil(),
892            roles: Vec::new(),
893            permissions: Vec::new(),
894            token_type: ClaimTokenType::UserToken,
895            metadata: None,
896            unknown_claims: BTreeMap::new(),
897        };
898        if let serde_json::Value::Object(map) = unknown {
899            for (k, v) in map {
900                claims.unknown_claims.insert(k, v);
901            }
902        }
903        claims
904    }
905
906    #[mz_ore::test]
907    fn groups_absent_returns_none() {
908        let claims = claims_with(json!({}));
909        assert_eq!(claims.groups("groups"), None);
910    }
911
912    #[mz_ore::test]
913    fn groups_empty_returns_some_empty() {
914        let claims = claims_with(json!({"groups": []}));
915        assert_eq!(claims.groups("groups"), Some(vec![]));
916    }
917
918    #[mz_ore::test]
919    fn groups_populated_returns_some_values() {
920        let claims = claims_with(json!({"groups": ["analytics", "engineering"]}));
921        assert_eq!(
922            claims.groups("groups"),
923            Some(vec!["analytics".to_string(), "engineering".to_string()])
924        );
925    }
926}