misc.python.materialize.docker
Docker utilities.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""Docker utilities.""" 11import re 12import subprocess 13import time 14 15import requests 16 17from materialize import ui 18from materialize.mz_version import MzVersion 19 20CACHED_IMAGE_NAME_BY_COMMIT_HASH: dict[str, str] = dict() 21EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: dict[str, bool] = dict() 22 23IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR = "--" 24LATEST_IMAGE_TAG = "latest" 25LEGACY_IMAGE_TAG_COMMIT_PREFIX = "devel-" 26MZ_GHCR_DEFAULT = "1" if ui.env_is_truthy("CI") else "0" 27 28# Examples: 29# * v0.114.0 30# * v0.114.0-dev 31# * v0.114.0-dev.0--pr.g3d565dd11ba1224a41beb6a584215d99e6b3c576 32VERSION_IN_IMAGE_TAG_PATTERN = re.compile(r"^(v\d+\.\d+\.\d+(-dev)?)") 33 34 35def image_of_release_version_exists(version: MzVersion, quiet: bool = False) -> bool: 36 if version.is_dev_version(): 37 raise ValueError(f"Version {version} is a dev version, not a release version") 38 39 return mz_image_tag_exists(release_version_to_image_tag(version), quiet=quiet) 40 41 42def image_of_commit_exists(commit_hash: str) -> bool: 43 try: 44 return mz_image_tag_exists(commit_to_image_tag(commit_hash)) 45 except (RuntimeError, requests.exceptions.RequestException) as e: 46 print(f"Failed to check if image of commit {commit_hash} exists: {e}") 47 return False 48 49 50def mz_image_tag_exists_cmdline(image_name: str) -> bool: 51 command = [ 52 "docker", 53 "manifest", 54 "inspect", 55 image_name, 56 ] 57 try: 58 subprocess.check_output(command, stderr=subprocess.STDOUT, text=True) 59 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 60 return True 61 except subprocess.CalledProcessError as e: 62 if "no such manifest:" in e.output: 63 print(f"Failed to fetch image manifest '{image_name}' (does not exist)") 64 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False 65 else: 66 print(f"Failed to fetch image manifest '{image_name}' ({e.output})") 67 # do not cache the result of unknown error messages 68 return False 69 70 71# Cache for the GHCR anonymous token to avoid re-fetching it for every image check. 72_ghcr_token: str | None = None 73 74 75def _get_ghcr_token() -> str: 76 """Get an anonymous token for pulling from GHCR.""" 77 global _ghcr_token 78 if _ghcr_token is None: 79 response = requests.get( 80 "https://ghcr.io/token?scope=repository:materializeinc/materialize/materialized:pull" 81 ) 82 response.raise_for_status() 83 _ghcr_token = response.json()["token"] 84 assert _ghcr_token is not None 85 return _ghcr_token 86 87 88def mz_image_tag_exists_ghcr(image_name: str, image_tag: str) -> bool: 89 """Check if an image tag exists on GHCR using the OCI distribution API.""" 90 try: 91 token = _get_ghcr_token() 92 response = requests.head( 93 f"https://ghcr.io/v2/materializeinc/materialize/materialized/manifests/{image_tag}", 94 headers={ 95 "Authorization": f"Bearer {token}", 96 "Accept": "application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.docker.distribution.manifest.v2+json", 97 }, 98 ) 99 if response.status_code == 200: 100 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 101 return True 102 elif response.status_code == 404: 103 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False 104 return False 105 elif response.status_code == 401: 106 # Token may have expired, clear it and fall back to cmdline 107 global _ghcr_token 108 _ghcr_token = None 109 return mz_image_tag_exists_cmdline(image_name) 110 else: 111 print( 112 f"GHCR API returned unexpected status {response.status_code} for {image_name}" 113 ) 114 return mz_image_tag_exists_cmdline(image_name) 115 except requests.exceptions.RequestException: 116 return mz_image_tag_exists_cmdline(image_name) 117 118 119def mz_image_tag_exists(image_tag: str, quiet: bool = False) -> bool: 120 image_name = f"{image_registry()}/materialized:{image_tag}" 121 122 if image_name in EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: 123 image_exists = EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] 124 if not quiet: 125 print( 126 f"Status of image {image_name} known from earlier check: {'exists' if image_exists else 'does not exist'}" 127 ) 128 return image_exists 129 130 if not quiet: 131 print(f"Checking existence of image manifest: {image_name}") 132 133 command_local = ["docker", "images", "--quiet", image_name] 134 135 output = subprocess.check_output(command_local, stderr=subprocess.STDOUT, text=True) 136 if output: 137 # image found locally, can skip querying remote Docker Hub 138 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 139 return True 140 141 registry = image_registry() 142 143 # Use registry-specific APIs to avoid slow `docker manifest inspect` calls 144 # and Docker Hub rate limits. 145 146 if registry.startswith("ghcr.io/"): 147 return mz_image_tag_exists_ghcr(image_name, image_tag) 148 149 if registry != "materialize": 150 return mz_image_tag_exists_cmdline(image_name) 151 152 # docker manifest inspect counts against the Docker Hub rate limits, even 153 # when the image doesn't exist, see https://www.docker.com/increase-rate-limits/, 154 # so use the API instead. 155 try: 156 response = requests.get( 157 f"https://hub.docker.com/v2/repositories/materialize/materialized/tags/{image_tag}" 158 ) 159 result = response.json() 160 except (requests.exceptions.ConnectionError, requests.exceptions.JSONDecodeError): 161 return mz_image_tag_exists_cmdline(image_name) 162 163 if result.get("images"): 164 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 165 return True 166 if "not found" in result.get("message", ""): 167 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False 168 return False 169 if not quiet: 170 print(f"Failed to fetch image info from API: {result}") 171 # do not cache the result of unknown error messages 172 return False 173 174 175def commit_to_image_tag(commit_hash: str) -> str: 176 return _resolve_image_name_by_commit_hash(commit_hash) 177 178 179def release_version_to_image_tag(version: MzVersion) -> str: 180 return str(version) 181 182 183def is_image_tag_of_release_version(image_tag: str) -> bool: 184 return ( 185 IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR not in image_tag 186 and not image_tag.startswith(LEGACY_IMAGE_TAG_COMMIT_PREFIX) 187 and image_tag != LATEST_IMAGE_TAG 188 ) 189 190 191def is_image_tag_of_commit(image_tag: str) -> bool: 192 return ( 193 IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR in image_tag 194 or image_tag.startswith(LEGACY_IMAGE_TAG_COMMIT_PREFIX) 195 ) 196 197 198def get_version_from_image_tag(image_tag: str) -> str: 199 match = VERSION_IN_IMAGE_TAG_PATTERN.match(image_tag) 200 assert match is not None, f"Invalid image tag: {image_tag}" 201 202 return match.group(1) 203 204 205def get_mz_version_from_image_tag(image_tag: str) -> MzVersion: 206 return MzVersion.parse_mz(get_version_from_image_tag(image_tag)) 207 208 209def _try_construct_image_tag(commit_hash: str) -> str | None: 210 """Construct the expected image tag from git info, avoiding Docker Hub search.""" 211 try: 212 cargo_toml = subprocess.check_output( 213 ["git", "show", f"{commit_hash}:src/environmentd/Cargo.toml"], 214 stderr=subprocess.DEVNULL, 215 text=True, 216 ) 217 version = None 218 for line in cargo_toml.splitlines(): 219 if line.startswith("version"): 220 match = re.search(r'"([^"]+)"', line) 221 if match: 222 version = match.group(1) 223 break 224 225 if version is None: 226 return None 227 228 tag = f"v{version}--main.g{commit_hash}" 229 if mz_image_tag_exists(tag, quiet=True): 230 return tag 231 232 return None 233 except Exception: 234 return None 235 236 237def _resolve_image_name_by_commit_hash(commit_hash: str) -> str: 238 if commit_hash in CACHED_IMAGE_NAME_BY_COMMIT_HASH: 239 return CACHED_IMAGE_NAME_BY_COMMIT_HASH[commit_hash] 240 241 # Try constructing the tag directly from git info (avoids Docker Hub search) 242 constructed_tag = _try_construct_image_tag(commit_hash) 243 if constructed_tag is not None: 244 image_name = constructed_tag 245 else: 246 # Fall back to Docker Hub search 247 image_name_candidates = _search_docker_hub_for_image_name( 248 search_value=commit_hash 249 ) 250 image_name = _select_image_name_from_candidates( 251 image_name_candidates, commit_hash 252 ) 253 254 CACHED_IMAGE_NAME_BY_COMMIT_HASH[commit_hash] = image_name 255 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 256 257 return image_name 258 259 260def _search_docker_hub_for_image_name( 261 search_value: str, remaining_retries: int = 3 262) -> list[str]: 263 try: 264 json_response = requests.get( 265 f"https://hub.docker.com/v2/repositories/materialize/materialized/tags?name={search_value}" 266 ).json() 267 except ( 268 requests.exceptions.ConnectionError, 269 requests.exceptions.JSONDecodeError, 270 ) as e: 271 if remaining_retries > 0: 272 print( 273 f"Searching Docker Hub for image name failed ({e}), retrying in 5 seconds" 274 ) 275 time.sleep(5) 276 return _search_docker_hub_for_image_name( 277 search_value, remaining_retries - 1 278 ) 279 280 raise 281 282 json_results = json_response.get("results") 283 284 image_names = [] 285 286 for entry in json_results: 287 image_name = entry.get("name") 288 289 if image_name.startswith("unstable-"): 290 # for images with the old version scheme favor "devel-" over "unstable-" 291 continue 292 293 image_names.append(image_name) 294 295 return image_names 296 297 298def _select_image_name_from_candidates( 299 image_name_candidates: list[str], commit_hash: str 300) -> str: 301 if len(image_name_candidates) == 0: 302 raise RuntimeError(f"No image found for commit hash {commit_hash}") 303 304 if len(image_name_candidates) > 1: 305 print( 306 f"Multiple images found for commit hash {commit_hash}: {image_name_candidates}, picking first" 307 ) 308 309 return image_name_candidates[0] 310 311 312def image_registry() -> str: 313 return ( 314 "ghcr.io/materializeinc/materialize" 315 if ui.env_is_truthy("MZ_GHCR", "1") 316 else "materialize" 317 )
CACHED_IMAGE_NAME_BY_COMMIT_HASH: dict[str, str] =
{}
EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: dict[str, bool] =
{}
IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR =
'--'
LATEST_IMAGE_TAG =
'latest'
LEGACY_IMAGE_TAG_COMMIT_PREFIX =
'devel-'
MZ_GHCR_DEFAULT =
'0'
VERSION_IN_IMAGE_TAG_PATTERN =
re.compile('^(v\\d+\\.\\d+\\.\\d+(-dev)?)')
def
image_of_release_version_exists(version: materialize.mz_version.MzVersion, quiet: bool = False) -> bool:
def
image_of_commit_exists(commit_hash: str) -> bool:
def
mz_image_tag_exists_cmdline(image_name: str) -> bool:
51def mz_image_tag_exists_cmdline(image_name: str) -> bool: 52 command = [ 53 "docker", 54 "manifest", 55 "inspect", 56 image_name, 57 ] 58 try: 59 subprocess.check_output(command, stderr=subprocess.STDOUT, text=True) 60 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 61 return True 62 except subprocess.CalledProcessError as e: 63 if "no such manifest:" in e.output: 64 print(f"Failed to fetch image manifest '{image_name}' (does not exist)") 65 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False 66 else: 67 print(f"Failed to fetch image manifest '{image_name}' ({e.output})") 68 # do not cache the result of unknown error messages 69 return False
def
mz_image_tag_exists_ghcr(image_name: str, image_tag: str) -> bool:
89def mz_image_tag_exists_ghcr(image_name: str, image_tag: str) -> bool: 90 """Check if an image tag exists on GHCR using the OCI distribution API.""" 91 try: 92 token = _get_ghcr_token() 93 response = requests.head( 94 f"https://ghcr.io/v2/materializeinc/materialize/materialized/manifests/{image_tag}", 95 headers={ 96 "Authorization": f"Bearer {token}", 97 "Accept": "application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.docker.distribution.manifest.v2+json", 98 }, 99 ) 100 if response.status_code == 200: 101 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 102 return True 103 elif response.status_code == 404: 104 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False 105 return False 106 elif response.status_code == 401: 107 # Token may have expired, clear it and fall back to cmdline 108 global _ghcr_token 109 _ghcr_token = None 110 return mz_image_tag_exists_cmdline(image_name) 111 else: 112 print( 113 f"GHCR API returned unexpected status {response.status_code} for {image_name}" 114 ) 115 return mz_image_tag_exists_cmdline(image_name) 116 except requests.exceptions.RequestException: 117 return mz_image_tag_exists_cmdline(image_name)
Check if an image tag exists on GHCR using the OCI distribution API.
def
mz_image_tag_exists(image_tag: str, quiet: bool = False) -> bool:
120def mz_image_tag_exists(image_tag: str, quiet: bool = False) -> bool: 121 image_name = f"{image_registry()}/materialized:{image_tag}" 122 123 if image_name in EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: 124 image_exists = EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] 125 if not quiet: 126 print( 127 f"Status of image {image_name} known from earlier check: {'exists' if image_exists else 'does not exist'}" 128 ) 129 return image_exists 130 131 if not quiet: 132 print(f"Checking existence of image manifest: {image_name}") 133 134 command_local = ["docker", "images", "--quiet", image_name] 135 136 output = subprocess.check_output(command_local, stderr=subprocess.STDOUT, text=True) 137 if output: 138 # image found locally, can skip querying remote Docker Hub 139 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 140 return True 141 142 registry = image_registry() 143 144 # Use registry-specific APIs to avoid slow `docker manifest inspect` calls 145 # and Docker Hub rate limits. 146 147 if registry.startswith("ghcr.io/"): 148 return mz_image_tag_exists_ghcr(image_name, image_tag) 149 150 if registry != "materialize": 151 return mz_image_tag_exists_cmdline(image_name) 152 153 # docker manifest inspect counts against the Docker Hub rate limits, even 154 # when the image doesn't exist, see https://www.docker.com/increase-rate-limits/, 155 # so use the API instead. 156 try: 157 response = requests.get( 158 f"https://hub.docker.com/v2/repositories/materialize/materialized/tags/{image_tag}" 159 ) 160 result = response.json() 161 except (requests.exceptions.ConnectionError, requests.exceptions.JSONDecodeError): 162 return mz_image_tag_exists_cmdline(image_name) 163 164 if result.get("images"): 165 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True 166 return True 167 if "not found" in result.get("message", ""): 168 EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False 169 return False 170 if not quiet: 171 print(f"Failed to fetch image info from API: {result}") 172 # do not cache the result of unknown error messages 173 return False
def
commit_to_image_tag(commit_hash: str) -> str:
def
release_version_to_image_tag(version: materialize.mz_version.MzVersion) -> str:
def
is_image_tag_of_release_version(image_tag: str) -> bool:
def
is_image_tag_of_commit(image_tag: str) -> bool:
def
get_version_from_image_tag(image_tag: str) -> str:
def
get_mz_version_from_image_tag(image_tag: str) -> materialize.mz_version.MzVersion:
def
image_registry() -> str: