Module materialize.cloudtest.util.wait

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import logging
import subprocess

from materialize import ui
from materialize.cloudtest import DEFAULT_K8S_CONTEXT_NAME
from materialize.cloudtest.util.print_pods import print_pods
from materialize.ui import UIError

LOGGER = logging.getLogger(__name__)


def wait(
    condition: str,
    resource: str,
    timeout_secs: int = 300,
    context: str = DEFAULT_K8S_CONTEXT_NAME,
    *,
    label: str | None = None,
    namespace: str | None = None,
) -> None:
    cmd = [
        "kubectl",
        "wait",
        "--for",
        condition,
        resource,
        "--timeout",
        f"{timeout_secs}s",
        "--context",
        context,
    ]

    if label is not None:
        cmd.extend(["--selector", label])

    if namespace is not None:
        cmd.extend(["--namespace", namespace])

    ui.progress(f'waiting for {" ".join(cmd)} ... ')

    error = None
    for remaining in ui.timeout_loop(timeout_secs, tick=0.1):
        try:
            output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode(
                "ascii"
            )
            # output is:
            # - an empty string when a 'delete' condition is satisfied
            # - 'condition met' for all other conditions
            if len(output) == 0 or "condition met" in output:
                ui.progress("success!", finish=True)
                return
        except subprocess.CalledProcessError as e:
            # use a less verbose output than log_subprocess_error here
            LOGGER.info(f"{e} {e.output.decode('ascii')}")
            error = e

    ui.progress(finish=True)
    print_pods()
    raise UIError(f"kubectl wait never returned 'condition met': {error}")

Functions

def wait(condition: str, resource: str, timeout_secs: int = 300, context: str = 'kind-mzcloud', *, label: str | None = None, namespace: str | None = None) ‑> None
Expand source code Browse git
def wait(
    condition: str,
    resource: str,
    timeout_secs: int = 300,
    context: str = DEFAULT_K8S_CONTEXT_NAME,
    *,
    label: str | None = None,
    namespace: str | None = None,
) -> None:
    cmd = [
        "kubectl",
        "wait",
        "--for",
        condition,
        resource,
        "--timeout",
        f"{timeout_secs}s",
        "--context",
        context,
    ]

    if label is not None:
        cmd.extend(["--selector", label])

    if namespace is not None:
        cmd.extend(["--namespace", namespace])

    ui.progress(f'waiting for {" ".join(cmd)} ... ')

    error = None
    for remaining in ui.timeout_loop(timeout_secs, tick=0.1):
        try:
            output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode(
                "ascii"
            )
            # output is:
            # - an empty string when a 'delete' condition is satisfied
            # - 'condition met' for all other conditions
            if len(output) == 0 or "condition met" in output:
                ui.progress("success!", finish=True)
                return
        except subprocess.CalledProcessError as e:
            # use a less verbose output than log_subprocess_error here
            LOGGER.info(f"{e} {e.output.decode('ascii')}")
            error = e

    ui.progress(finish=True)
    print_pods()
    raise UIError(f"kubectl wait never returned 'condition met': {error}")