Module materialize.docker
Docker utilities.
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
"""Docker utilities."""
import subprocess
import requests
from materialize.mz_version import MzVersion
EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: dict[str, bool] = dict()
IMAGE_TAG_OF_VERSION_PREFIX = "v"
IMAGE_TAG_OF_COMMIT_PREFIX = "devel-"
def image_of_release_version_exists(version: MzVersion) -> bool:
return mz_image_tag_exists(version_to_image_tag(version))
def image_of_commit_exists(commit_hash: str) -> bool:
return mz_image_tag_exists(commit_to_image_tag(commit_hash))
def mz_image_tag_exists(image_tag: str) -> bool:
image_name = f"materialize/materialized:{image_tag}"
if image_name in EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK:
image_exists = EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name]
print(
f"Status of image {image_name} known from earlier check: {'exists' if image_exists else 'does not exist'}"
)
return image_exists
print(f"Checking existence of image manifest: {image_name}")
command_local = ["docker", "images", "--quiet", image_name]
output = subprocess.check_output(command_local, stderr=subprocess.STDOUT, text=True)
if output:
# image found locally, can skip querying remote Docker Hub
EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
return True
# docker manifest inspect counts against the Docker Hub rate limits, even
# when the image doesn't exist, see https://www.docker.com/increase-rate-limits/,
# so use the API instead.
try:
response = requests.get(
f"https://hub.docker.com/v2/repositories/materialize/materialized/tags/{image_tag}"
)
except requests.exceptions.ConnectionError:
command = [
"docker",
"manifest",
"inspect",
image_name,
]
try:
subprocess.check_output(command, stderr=subprocess.STDOUT, text=True)
EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
return True
except subprocess.CalledProcessError as e:
if "no such manifest:" in e.output:
print(f"Failed to fetch image manifest '{image_name}' (does not exist)")
EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
else:
print(f"Failed to fetch image manifest '{image_name}' ({e.output})")
# do not cache the result of unknown error messages
return False
result = response.json()
if result.get("images"):
EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
return True
if "not found" in result.get("message", ""):
EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
return False
print(f"Failed to fetch image info from API: {result}")
# do not cache the result of unknown error messages
return False
def commit_to_image_tag(commit_hash: str) -> str:
return f"{IMAGE_TAG_OF_COMMIT_PREFIX}{commit_hash}"
def version_to_image_tag(version: MzVersion) -> str:
return str(version)
def is_image_tag_of_version(image_tag: str) -> bool:
return image_tag.startswith(IMAGE_TAG_OF_VERSION_PREFIX)
def is_image_tag_of_commit(image_tag: str) -> bool:
return image_tag.startswith(IMAGE_TAG_OF_COMMIT_PREFIX)
def get_version_from_image_tag(image_tag: str) -> str:
assert image_tag.startswith(IMAGE_TAG_OF_VERSION_PREFIX)
# image tag is equal to the version
return image_tag
def get_mz_version_from_image_tag(image_tag: str) -> MzVersion:
return MzVersion.parse_mz(get_version_from_image_tag(image_tag))
def get_commit_from_image_tag(image_tag: str) -> str:
assert image_tag.startswith(IMAGE_TAG_OF_COMMIT_PREFIX)
return image_tag.removeprefix(IMAGE_TAG_OF_COMMIT_PREFIX)
Functions
def commit_to_image_tag(commit_hash: str) ‑> str
-
Expand source code Browse git
def commit_to_image_tag(commit_hash: str) -> str: return f"{IMAGE_TAG_OF_COMMIT_PREFIX}{commit_hash}"
def get_commit_from_image_tag(image_tag: str) ‑> str
-
Expand source code Browse git
def get_commit_from_image_tag(image_tag: str) -> str: assert image_tag.startswith(IMAGE_TAG_OF_COMMIT_PREFIX) return image_tag.removeprefix(IMAGE_TAG_OF_COMMIT_PREFIX)
def get_mz_version_from_image_tag(image_tag: str) ‑> MzVersion
-
Expand source code Browse git
def get_mz_version_from_image_tag(image_tag: str) -> MzVersion: return MzVersion.parse_mz(get_version_from_image_tag(image_tag))
def get_version_from_image_tag(image_tag: str) ‑> str
-
Expand source code Browse git
def get_version_from_image_tag(image_tag: str) -> str: assert image_tag.startswith(IMAGE_TAG_OF_VERSION_PREFIX) # image tag is equal to the version return image_tag
def image_of_commit_exists(commit_hash: str) ‑> bool
-
Expand source code Browse git
def image_of_commit_exists(commit_hash: str) -> bool: return mz_image_tag_exists(commit_to_image_tag(commit_hash))
def image_of_release_version_exists(version: MzVersion) ‑> bool
-
Expand source code Browse git
def image_of_release_version_exists(version: MzVersion) -> bool: return mz_image_tag_exists(version_to_image_tag(version))
def is_image_tag_of_commit(image_tag: str) ‑> bool
-
Expand source code Browse git
def is_image_tag_of_commit(image_tag: str) -> bool: return image_tag.startswith(IMAGE_TAG_OF_COMMIT_PREFIX)
def is_image_tag_of_version(image_tag: str) ‑> bool
-
Expand source code Browse git
def is_image_tag_of_version(image_tag: str) -> bool: return image_tag.startswith(IMAGE_TAG_OF_VERSION_PREFIX)
def mz_image_tag_exists(image_tag: str) ‑> bool
-
Expand source code Browse git
def mz_image_tag_exists(image_tag: str) -> bool: image_name = f"materialize/materialized:{image_tag}" if image_name in EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: image_exists = EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] print( f"Status of image {image_name} known from earlier check: {'exists' if image_exists else 'does not exist'}" ) return image_exists print(f"Checking existence of image manifest: {image_name}") command_local = ["docker", "images", "--quiet", image_name] output = subprocess.check_output(command_local, stderr=subprocess.STDOUT, text=True) if output: # image found locally, can skip querying remote Docker Hub EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True return True # docker manifest inspect counts against the Docker Hub rate limits, even # when the image doesn't exist, see https://www.docker.com/increase-rate-limits/, # so use the API instead. try: response = requests.get( f"https://hub.docker.com/v2/repositories/materialize/materialized/tags/{image_tag}" ) except requests.exceptions.ConnectionError: command = [ "docker", "manifest", "inspect", image_name, ] try: subprocess.check_output(command, stderr=subprocess.STDOUT, text=True) EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True return True except subprocess.CalledProcessError as e: if "no such manifest:" in e.output: print(f"Failed to fetch image manifest '{image_name}' (does not exist)") EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False else: print(f"Failed to fetch image manifest '{image_name}' ({e.output})") # do not cache the result of unknown error messages return False result = response.json() if result.get("images"): EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True return True if "not found" in result.get("message", ""): EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False return False print(f"Failed to fetch image info from API: {result}") # do not cache the result of unknown error messages return False
def version_to_image_tag(version: MzVersion) ‑> str
-
Expand source code Browse git
def version_to_image_tag(version: MzVersion) -> str: return str(version)