Module materialize.cloudtest.util.authentication

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass, fields

import requests
from requests.exceptions import ConnectionError, ReadTimeout

from materialize.cloudtest.util.common import retry
from materialize.cloudtest.util.jwt_key import fetch_jwt


@dataclass
class AuthConfig:
    organization_id: str
    token: str
    app_user: str | None
    app_password: str | None

    refresh_fn: Callable[[AuthConfig], None]

    pgwire_ssl_mode: str = "require"
    tls_ca_cert_path: str | None = None

    def refresh(self) -> None:
        self.refresh_fn(self)


@dataclass
class TestUserConfig:
    email: str
    password: str
    frontegg_host: str


DEFAULT_ORG_ID = "80b1a04a-2277-11ed-a1ce-5405dbb9e0f7"


# TODO: this retry loop should not be necessary, but we are seeing
# connections getting frequently (but sporadically) interrupted here - we
# should track this down and remove these retries
def create_auth(
    user: TestUserConfig,
    refresh_fn: Callable[[AuthConfig], None],
) -> AuthConfig:
    config: AuthConfig = retry(
        lambda: _create_auth(
            user,
            refresh_fn,
        ),
        5,
        [ConnectionError, ReadTimeout],
    )
    return config


def _create_auth(
    user: TestUserConfig,
    refresh_fn: Callable[[AuthConfig], None],
) -> AuthConfig:
    if user.frontegg_host.startswith("127.0.0.1"):
        scheme = "http"
    else:
        scheme = "https"
    token = fetch_jwt(
        email=user.email,
        password=user.password,
        host=user.frontegg_host,
        scheme=scheme,
    )

    identity_url = f"{scheme}://{user.frontegg_host}/identity/resources/users/v2/me"
    response = requests.get(
        identity_url,
        headers={"authorization": f"Bearer {token}"},
        timeout=10,
    )
    response.raise_for_status()

    organization_id = response.json()["tenantId"]
    app_user = user.email
    app_password = make_app_password(user.frontegg_host, token, scheme)
    tls_ca_cert_path = None

    return AuthConfig(
        organization_id=organization_id,
        token=token,
        app_user=app_user,
        app_password=app_password,
        refresh_fn=refresh_fn,
        tls_ca_cert_path=tls_ca_cert_path,
    )


def update_auth(
    user: TestUserConfig,
    auth: AuthConfig,
) -> None:
    new_auth = create_auth(user, auth.refresh_fn)

    for field in fields(new_auth):
        setattr(auth, field.name, getattr(new_auth, field.name))


def make_app_password(frontegg_host: str, token: str, scheme: str) -> str:
    response = requests.post(
        f"{scheme}://{frontegg_host}/identity/resources/users/api-tokens/v1",
        json={"description": "e2e test password"},
        headers={"authorization": f"Bearer {token}"},
        timeout=10,
    )
    response.raise_for_status()
    data = response.json()
    client_id = data["clientId"].replace("-", "")
    secret = data["secret"].replace("-", "")
    return f"mzp_{client_id}{secret}"

Functions

def create_auth(user: TestUserConfig, refresh_fn: Callable[[AuthConfig], None]) ‑> AuthConfig
Expand source code Browse git
def create_auth(
    user: TestUserConfig,
    refresh_fn: Callable[[AuthConfig], None],
) -> AuthConfig:
    config: AuthConfig = retry(
        lambda: _create_auth(
            user,
            refresh_fn,
        ),
        5,
        [ConnectionError, ReadTimeout],
    )
    return config
def make_app_password(frontegg_host: str, token: str, scheme: str) ‑> str
Expand source code Browse git
def make_app_password(frontegg_host: str, token: str, scheme: str) -> str:
    response = requests.post(
        f"{scheme}://{frontegg_host}/identity/resources/users/api-tokens/v1",
        json={"description": "e2e test password"},
        headers={"authorization": f"Bearer {token}"},
        timeout=10,
    )
    response.raise_for_status()
    data = response.json()
    client_id = data["clientId"].replace("-", "")
    secret = data["secret"].replace("-", "")
    return f"mzp_{client_id}{secret}"
def update_auth(user: TestUserConfig, auth: AuthConfig) ‑> None
Expand source code Browse git
def update_auth(
    user: TestUserConfig,
    auth: AuthConfig,
) -> None:
    new_auth = create_auth(user, auth.refresh_fn)

    for field in fields(new_auth):
        setattr(auth, field.name, getattr(new_auth, field.name))

Classes

class AuthConfig (organization_id: str, token: str, app_user: str | None, app_password: str | None, refresh_fn: Callable[[AuthConfig], None], pgwire_ssl_mode: str = 'require', tls_ca_cert_path: str | None = None)

AuthConfig(organization_id: 'str', token: 'str', app_user: 'str | None', app_password: 'str | None', refresh_fn: 'Callable[[AuthConfig], None]', pgwire_ssl_mode: 'str' = 'require', tls_ca_cert_path: 'str | None' = None)

Expand source code Browse git
@dataclass
class AuthConfig:
    organization_id: str
    token: str
    app_user: str | None
    app_password: str | None

    refresh_fn: Callable[[AuthConfig], None]

    pgwire_ssl_mode: str = "require"
    tls_ca_cert_path: str | None = None

    def refresh(self) -> None:
        self.refresh_fn(self)

Class variables

var app_password : str | None
var app_user : str | None
var organization_id : str
var pgwire_ssl_mode : str
var refresh_fn : collections.abc.Callable[[AuthConfig], None]
var tls_ca_cert_path : str | None
var token : str

Methods

def refresh(self) ‑> None
Expand source code Browse git
def refresh(self) -> None:
    self.refresh_fn(self)
class TestUserConfig (email: str, password: str, frontegg_host: str)

TestUserConfig(email: 'str', password: 'str', frontegg_host: 'str')

Expand source code Browse git
@dataclass
class TestUserConfig:
    email: str
    password: str
    frontegg_host: str

Class variables

var email : str
var frontegg_host : str
var password : str