misc.python.materialize.docker
Docker utilities.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""Docker utilities.""" 11 12import re 13import subprocess 14import time 15 16import requests 17 18from materialize import ui 19from materialize.mz_version import MzVersion 20 21CACHED_IMAGE_NAME_BY_COMMIT_HASH: dict[str, str] = dict() 22EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: dict[str, bool] = dict() 23 24IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR = "--" 25LATEST_IMAGE_TAG = "latest" 26LEGACY_IMAGE_TAG_COMMIT_PREFIX = "devel-" 27MZ_GHCR_DEFAULT = "1" if ui.env_is_truthy("CI") else "0" 28 29# Examples: 30# * v0.114.0 31# * v0.114.0-dev 32# * v0.114.0-dev.0--pr.g3d565dd11ba1224a41beb6a584215d99e6b3c576 33VERSION_IN_IMAGE_TAG_PATTERN = re.compile(r"^(v\d+\.\d+\.\d+(-dev)?)") 34 35 36def image_of_release_version_exists(version: MzVersion, quiet: bool = False) -> bool: 37 if version.is_dev_version(): 38 raise ValueError(f"Version {version} is a dev version, not a release version") 39 40 return mz_image_tag_exists(release_version_to_image_tag(version), quiet=quiet) 41 42 43def image_of_commit_exists(commit_hash: str) -> bool: 44 try: 45 return mz_image_tag_exists(commit_to_image_tag(commit_hash)) 46 except (RuntimeError, requests.exceptions.RequestException) as e: 47 print(f"Failed to check if image of commit {commit_hash} exists: {e}") 48 return False 49 50 51def mz_image_tag_exists_cmdline(image_name: str) -> bool: 52 command = [ 53 "docker", 54 "manifest", 55 "inspect", 56 image_name, 57 ] 58 try: 59 subprocess.check_output(command, stderr=subprocess.STDOUT, text=True) 60 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 61 return True 62 except subprocess.CalledProcessError as e: 63 if "no such manifest:" in e.output: 64 print(f"Failed to fetch image manifest '{image_name}' (does not exist)") 65 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False 66 else: 67 print(f"Failed to fetch image manifest '{image_name}' ({e.output})") 68 # do not cache the result of unknown error messages 69 return False 70 71 72# Cache for the GHCR anonymous token to avoid re-fetching it for every image check. 73_ghcr_token: str | None = None 74 75 76def _get_ghcr_token() -> str: 77 """Get an anonymous token for pulling from GHCR.""" 78 global _ghcr_token 79 if _ghcr_token is None: 80 response = requests.get( 81 "https://ghcr.io/token?scope=repository:materializeinc/materialize/materialized:pull" 82 ) 83 response.raise_for_status() 84 _ghcr_token = response.json()["token"] 85 assert _ghcr_token is not None 86 return _ghcr_token 87 88 89def mz_image_tag_exists_ghcr(image_name: str, image_tag: str) -> bool: 90 """Check if an image tag exists on GHCR using the OCI distribution API.""" 91 try: 92 token = _get_ghcr_token() 93 response = requests.head( 94 f"https://ghcr.io/v2/materializeinc/materialize/materialized/manifests/{image_tag}", 95 headers={ 96 "Authorization": f"Bearer {token}", 97 "Accept": "application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.docker.distribution.manifest.v2+json", 98 }, 99 ) 100 if response.status_code == 200: 101 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 102 return True 103 elif response.status_code == 404: 104 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False 105 return False 106 elif response.status_code == 401: 107 # Token may have expired, clear it and fall back to cmdline 108 global _ghcr_token 109 _ghcr_token = None 110 return mz_image_tag_exists_cmdline(image_name) 111 else: 112 print( 113 f"GHCR API returned unexpected status {response.status_code} for {image_name}" 114 ) 115 return mz_image_tag_exists_cmdline(image_name) 116 except requests.exceptions.RequestException: 117 return mz_image_tag_exists_cmdline(image_name) 118 119 120def mz_image_tag_exists(image_tag: str, quiet: bool = False) -> bool: 121 image_name = f"{image_registry()}/materialized:{image_tag}" 122 123 if image_name in EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: 124 image_exists = EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] 125 if not quiet: 126 print( 127 f"Status of image {image_name} known from earlier check: {'exists' if image_exists else 'does not exist'}" 128 ) 129 return image_exists 130 131 if not quiet: 132 print(f"Checking existence of image manifest: {image_name}") 133 134 command_local = ["docker", "images", "--quiet", image_name] 135 136 output = subprocess.check_output(command_local, stderr=subprocess.STDOUT, text=True) 137 if output: 138 # image found locally, can skip querying remote Docker Hub 139 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 140 return True 141 142 registry = image_registry() 143 144 # Use registry-specific APIs to avoid slow `docker manifest inspect` calls 145 # and Docker Hub rate limits. 146 147 if registry.startswith("ghcr.io/"): 148 return mz_image_tag_exists_ghcr(image_name, image_tag) 149 150 if registry != "materialize": 151 return mz_image_tag_exists_cmdline(image_name) 152 153 # docker manifest inspect counts against the Docker Hub rate limits, even 154 # when the image doesn't exist, see https://www.docker.com/increase-rate-limits/, 155 # so use the API instead. 156 try: 157 response = requests.get( 158 f"https://hub.docker.com/v2/repositories/materialize/materialized/tags/{image_tag}" 159 ) 160 result = response.json() 161 except (requests.exceptions.ConnectionError, requests.exceptions.JSONDecodeError): 162 return mz_image_tag_exists_cmdline(image_name) 163 164 if result.get("images"): 165 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 166 return True 167 if "not found" in result.get("message", ""): 168 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False 169 return False 170 if not quiet: 171 print(f"Failed to fetch image info from API: {result}") 172 # do not cache the result of unknown error messages 173 return False 174 175 176def commit_to_image_tag(commit_hash: str) -> str: 177 return _resolve_image_name_by_commit_hash(commit_hash) 178 179 180def release_version_to_image_tag(version: MzVersion) -> str: 181 return str(version) 182 183 184def is_image_tag_of_release_version(image_tag: str) -> bool: 185 return ( 186 IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR not in image_tag 187 and not image_tag.startswith(LEGACY_IMAGE_TAG_COMMIT_PREFIX) 188 and image_tag != LATEST_IMAGE_TAG 189 ) 190 191 192def get_version_from_image_tag(image_tag: str) -> str: 193 match = VERSION_IN_IMAGE_TAG_PATTERN.match(image_tag) 194 assert match is not None, f"Invalid image tag: {image_tag}" 195 196 return match.group(1) 197 198 199def get_mz_version_from_image_tag(image_tag: str) -> MzVersion: 200 return MzVersion.parse_mz(get_version_from_image_tag(image_tag)) 201 202 203def _try_construct_image_tag(commit_hash: str) -> str | None: 204 """Construct the expected image tag from git info, avoiding Docker Hub search.""" 205 try: 206 cargo_toml = subprocess.check_output( 207 ["git", "show", f"{commit_hash}:src/environmentd/Cargo.toml"], 208 stderr=subprocess.DEVNULL, 209 text=True, 210 ) 211 version = None 212 for line in cargo_toml.splitlines(): 213 if line.startswith("version"): 214 match = re.search(r'"([^"]+)"', line) 215 if match: 216 version = match.group(1) 217 break 218 219 if version is None: 220 return None 221 222 tag = f"v{version}--main.g{commit_hash}" 223 if mz_image_tag_exists(tag, quiet=True): 224 return tag 225 226 return None 227 except Exception: 228 return None 229 230 231def _resolve_image_name_by_commit_hash(commit_hash: str) -> str: 232 if commit_hash in CACHED_IMAGE_NAME_BY_COMMIT_HASH: 233 return CACHED_IMAGE_NAME_BY_COMMIT_HASH[commit_hash] 234 235 # Try constructing the tag directly from git info (avoids Docker Hub search) 236 constructed_tag = _try_construct_image_tag(commit_hash) 237 if constructed_tag is not None: 238 image_name = constructed_tag 239 else: 240 # Fall back to Docker Hub search 241 image_name_candidates = _search_docker_hub_for_image_name( 242 search_value=commit_hash 243 ) 244 image_name = _select_image_name_from_candidates( 245 image_name_candidates, commit_hash 246 ) 247 248 CACHED_IMAGE_NAME_BY_COMMIT_HASH[commit_hash] = image_name 249 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 250 251 return image_name 252 253 254def _search_docker_hub_for_image_name( 255 search_value: str, remaining_retries: int = 3 256) -> list[str]: 257 try: 258 json_response = requests.get( 259 f"https://hub.docker.com/v2/repositories/materialize/materialized/tags?name={search_value}" 260 ).json() 261 except ( 262 requests.exceptions.ConnectionError, 263 requests.exceptions.JSONDecodeError, 264 ) as e: 265 if remaining_retries > 0: 266 print( 267 f"Searching Docker Hub for image name failed ({e}), retrying in 5 seconds" 268 ) 269 time.sleep(5) 270 return _search_docker_hub_for_image_name( 271 search_value, remaining_retries - 1 272 ) 273 274 raise 275 276 json_results = json_response.get("results") 277 278 image_names = [] 279 280 for entry in json_results: 281 image_name = entry.get("name") 282 283 if image_name.startswith("unstable-"): 284 # for images with the old version scheme favor "devel-" over "unstable-" 285 continue 286 287 image_names.append(image_name) 288 289 return image_names 290 291 292def _select_image_name_from_candidates( 293 image_name_candidates: list[str], commit_hash: str 294) -> str: 295 if len(image_name_candidates) == 0: 296 raise RuntimeError(f"No image found for commit hash {commit_hash}") 297 298 if len(image_name_candidates) > 1: 299 print( 300 f"Multiple images found for commit hash {commit_hash}: {image_name_candidates}, picking first" 301 ) 302 303 return image_name_candidates[0] 304 305 306def image_registry() -> str: 307 return ( 308 "ghcr.io/materializeinc/materialize" 309 if ui.env_is_truthy("MZ_GHCR", "1") 310 else "materialize" 311 )
CACHED_IMAGE_NAME_BY_COMMIT_HASH: dict[str, str] =
{}
EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: dict[str, bool] =
{}
IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR =
'--'
LATEST_IMAGE_TAG =
'latest'
LEGACY_IMAGE_TAG_COMMIT_PREFIX =
'devel-'
MZ_GHCR_DEFAULT =
'0'
VERSION_IN_IMAGE_TAG_PATTERN =
re.compile('^(v\\d+\\.\\d+\\.\\d+(-dev)?)')
def
image_of_release_version_exists(version: materialize.mz_version.MzVersion, quiet: bool = False) -> bool:
def
image_of_commit_exists(commit_hash: str) -> bool:
def
mz_image_tag_exists_cmdline(image_name: str) -> bool:
52def mz_image_tag_exists_cmdline(image_name: str) -> bool: 53 command = [ 54 "docker", 55 "manifest", 56 "inspect", 57 image_name, 58 ] 59 try: 60 subprocess.check_output(command, stderr=subprocess.STDOUT, text=True) 61 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 62 return True 63 except subprocess.CalledProcessError as e: 64 if "no such manifest:" in e.output: 65 print(f"Failed to fetch image manifest '{image_name}' (does not exist)") 66 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False 67 else: 68 print(f"Failed to fetch image manifest '{image_name}' ({e.output})") 69 # do not cache the result of unknown error messages 70 return False
def
mz_image_tag_exists_ghcr(image_name: str, image_tag: str) -> bool:
90def mz_image_tag_exists_ghcr(image_name: str, image_tag: str) -> bool: 91 """Check if an image tag exists on GHCR using the OCI distribution API.""" 92 try: 93 token = _get_ghcr_token() 94 response = requests.head( 95 f"https://ghcr.io/v2/materializeinc/materialize/materialized/manifests/{image_tag}", 96 headers={ 97 "Authorization": f"Bearer {token}", 98 "Accept": "application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.docker.distribution.manifest.v2+json", 99 }, 100 ) 101 if response.status_code == 200: 102 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 103 return True 104 elif response.status_code == 404: 105 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False 106 return False 107 elif response.status_code == 401: 108 # Token may have expired, clear it and fall back to cmdline 109 global _ghcr_token 110 _ghcr_token = None 111 return mz_image_tag_exists_cmdline(image_name) 112 else: 113 print( 114 f"GHCR API returned unexpected status {response.status_code} for {image_name}" 115 ) 116 return mz_image_tag_exists_cmdline(image_name) 117 except requests.exceptions.RequestException: 118 return mz_image_tag_exists_cmdline(image_name)
Check if an image tag exists on GHCR using the OCI distribution API.
def
mz_image_tag_exists(image_tag: str, quiet: bool = False) -> bool:
121def mz_image_tag_exists(image_tag: str, quiet: bool = False) -> bool: 122 image_name = f"{image_registry()}/materialized:{image_tag}" 123 124 if image_name in EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: 125 image_exists = EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] 126 if not quiet: 127 print( 128 f"Status of image {image_name} known from earlier check: {'exists' if image_exists else 'does not exist'}" 129 ) 130 return image_exists 131 132 if not quiet: 133 print(f"Checking existence of image manifest: {image_name}") 134 135 command_local = ["docker", "images", "--quiet", image_name] 136 137 output = subprocess.check_output(command_local, stderr=subprocess.STDOUT, text=True) 138 if output: 139 # image found locally, can skip querying remote Docker Hub 140 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 141 return True 142 143 registry = image_registry() 144 145 # Use registry-specific APIs to avoid slow `docker manifest inspect` calls 146 # and Docker Hub rate limits. 147 148 if registry.startswith("ghcr.io/"): 149 return mz_image_tag_exists_ghcr(image_name, image_tag) 150 151 if registry != "materialize": 152 return mz_image_tag_exists_cmdline(image_name) 153 154 # docker manifest inspect counts against the Docker Hub rate limits, even 155 # when the image doesn't exist, see https://www.docker.com/increase-rate-limits/, 156 # so use the API instead. 157 try: 158 response = requests.get( 159 f"https://hub.docker.com/v2/repositories/materialize/materialized/tags/{image_tag}" 160 ) 161 result = response.json() 162 except (requests.exceptions.ConnectionError, requests.exceptions.JSONDecodeError): 163 return mz_image_tag_exists_cmdline(image_name) 164 165 if result.get("images"): 166 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 167 return True 168 if "not found" in result.get("message", ""): 169 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False 170 return False 171 if not quiet: 172 print(f"Failed to fetch image info from API: {result}") 173 # do not cache the result of unknown error messages 174 return False
def
commit_to_image_tag(commit_hash: str) -> str:
def
release_version_to_image_tag(version: materialize.mz_version.MzVersion) -> str:
def
is_image_tag_of_release_version(image_tag: str) -> bool:
def
get_version_from_image_tag(image_tag: str) -> str:
def
get_mz_version_from_image_tag(image_tag: str) -> materialize.mz_version.MzVersion:
def
image_registry() -> str: