Expand source code
Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import logging
import subprocess
from materialize import ui
from materialize.cloudtest import DEFAULT_K8S_CONTEXT_NAME
from materialize.cloudtest.util.print_pods import print_pods
from materialize.ui import UIError
LOGGER = logging.getLogger(__name__)
def wait(
condition: str,
resource: str,
timeout_secs: int = 300,
context: str = DEFAULT_K8S_CONTEXT_NAME,
*,
label: str | None = None,
namespace: str | None = None,
) -> None:
cmd = [
"kubectl",
"wait",
"--for",
condition,
resource,
"--timeout",
f"{timeout_secs}s",
"--context",
context,
]
if label is not None:
cmd.extend(["--selector", label])
if namespace is not None:
cmd.extend(["--namespace", namespace])
ui.progress(f'waiting for {" ".join(cmd)} ... ')
error = None
for remaining in ui.timeout_loop(timeout_secs, tick=0.1):
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode(
"ascii"
)
# output is:
# - an empty string when a 'delete' condition is satisfied
# - 'condition met' for all other conditions
if len(output) == 0 or "condition met" in output:
ui.progress("success!", finish=True)
return
except subprocess.CalledProcessError as e:
# use a less verbose output than log_subprocess_error here
LOGGER.info(f"{e} {e.output.decode('ascii')}")
error = e
ui.progress(finish=True)
print_pods()
raise UIError(f"kubectl wait never returned 'condition met': {error}")