misc.python.materialize.docker

Docker utilities.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""Docker utilities."""
 11import re
 12import subprocess
 13import time
 14
 15import requests
 16
 17from materialize import ui
 18from materialize.mz_version import MzVersion
 19
 20CACHED_IMAGE_NAME_BY_COMMIT_HASH: dict[str, str] = dict()
 21EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: dict[str, bool] = dict()
 22
 23IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR = "--"
 24LATEST_IMAGE_TAG = "latest"
 25LEGACY_IMAGE_TAG_COMMIT_PREFIX = "devel-"
 26MZ_GHCR_DEFAULT = "1" if ui.env_is_truthy("CI") else "0"
 27
 28# Examples:
 29# * v0.114.0
 30# * v0.114.0-dev
 31# * v0.114.0-dev.0--pr.g3d565dd11ba1224a41beb6a584215d99e6b3c576
 32VERSION_IN_IMAGE_TAG_PATTERN = re.compile(r"^(v\d+\.\d+\.\d+(-dev)?)")
 33
 34
 35def image_of_release_version_exists(version: MzVersion, quiet: bool = False) -> bool:
 36    if version.is_dev_version():
 37        raise ValueError(f"Version {version} is a dev version, not a release version")
 38
 39    return mz_image_tag_exists(release_version_to_image_tag(version), quiet=quiet)
 40
 41
 42def image_of_commit_exists(commit_hash: str) -> bool:
 43    try:
 44        return mz_image_tag_exists(commit_to_image_tag(commit_hash))
 45    except (RuntimeError, requests.exceptions.RequestException) as e:
 46        print(f"Failed to check if image of commit {commit_hash} exists: {e}")
 47        return False
 48
 49
 50def mz_image_tag_exists_cmdline(image_name: str) -> bool:
 51    command = [
 52        "docker",
 53        "manifest",
 54        "inspect",
 55        image_name,
 56    ]
 57    try:
 58        subprocess.check_output(command, stderr=subprocess.STDOUT, text=True)
 59        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
 60        return True
 61    except subprocess.CalledProcessError as e:
 62        if "no such manifest:" in e.output:
 63            print(f"Failed to fetch image manifest '{image_name}' (does not exist)")
 64            EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
 65        else:
 66            print(f"Failed to fetch image manifest '{image_name}' ({e.output})")
 67            # do not cache the result of unknown error messages
 68        return False
 69
 70
 71# Cache for the GHCR anonymous token to avoid re-fetching it for every image check.
 72_ghcr_token: str | None = None
 73
 74
 75def _get_ghcr_token() -> str:
 76    """Get an anonymous token for pulling from GHCR."""
 77    global _ghcr_token
 78    if _ghcr_token is None:
 79        response = requests.get(
 80            "https://ghcr.io/token?scope=repository:materializeinc/materialize/materialized:pull"
 81        )
 82        response.raise_for_status()
 83        _ghcr_token = response.json()["token"]
 84    assert _ghcr_token is not None
 85    return _ghcr_token
 86
 87
 88def mz_image_tag_exists_ghcr(image_name: str, image_tag: str) -> bool:
 89    """Check if an image tag exists on GHCR using the OCI distribution API."""
 90    try:
 91        token = _get_ghcr_token()
 92        response = requests.head(
 93            f"https://ghcr.io/v2/materializeinc/materialize/materialized/manifests/{image_tag}",
 94            headers={
 95                "Authorization": f"Bearer {token}",
 96                "Accept": "application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.docker.distribution.manifest.v2+json",
 97            },
 98        )
 99        if response.status_code == 200:
100            EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
101            return True
102        elif response.status_code == 404:
103            EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
104            return False
105        elif response.status_code == 401:
106            # Token may have expired, clear it and fall back to cmdline
107            global _ghcr_token
108            _ghcr_token = None
109            return mz_image_tag_exists_cmdline(image_name)
110        else:
111            print(
112                f"GHCR API returned unexpected status {response.status_code} for {image_name}"
113            )
114            return mz_image_tag_exists_cmdline(image_name)
115    except requests.exceptions.RequestException:
116        return mz_image_tag_exists_cmdline(image_name)
117
118
119def mz_image_tag_exists(image_tag: str, quiet: bool = False) -> bool:
120    image_name = f"{image_registry()}/materialized:{image_tag}"
121
122    if image_name in EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK:
123        image_exists = EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name]
124        if not quiet:
125            print(
126                f"Status of image {image_name} known from earlier check: {'exists' if image_exists else 'does not exist'}"
127            )
128        return image_exists
129
130    if not quiet:
131        print(f"Checking existence of image manifest: {image_name}")
132
133    command_local = ["docker", "images", "--quiet", image_name]
134
135    output = subprocess.check_output(command_local, stderr=subprocess.STDOUT, text=True)
136    if output:
137        # image found locally, can skip querying remote Docker Hub
138        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
139        return True
140
141    registry = image_registry()
142
143    # Use registry-specific APIs to avoid slow `docker manifest inspect` calls
144    # and Docker Hub rate limits.
145
146    if registry.startswith("ghcr.io/"):
147        return mz_image_tag_exists_ghcr(image_name, image_tag)
148
149    if registry != "materialize":
150        return mz_image_tag_exists_cmdline(image_name)
151
152    # docker manifest inspect counts against the Docker Hub rate limits, even
153    # when the image doesn't exist, see https://www.docker.com/increase-rate-limits/,
154    # so use the API instead.
155    try:
156        response = requests.get(
157            f"https://hub.docker.com/v2/repositories/materialize/materialized/tags/{image_tag}"
158        )
159        result = response.json()
160    except (requests.exceptions.ConnectionError, requests.exceptions.JSONDecodeError):
161        return mz_image_tag_exists_cmdline(image_name)
162
163    if result.get("images"):
164        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
165        return True
166    if "not found" in result.get("message", ""):
167        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
168        return False
169    if not quiet:
170        print(f"Failed to fetch image info from API: {result}")
171    # do not cache the result of unknown error messages
172    return False
173
174
175def commit_to_image_tag(commit_hash: str) -> str:
176    return _resolve_image_name_by_commit_hash(commit_hash)
177
178
179def release_version_to_image_tag(version: MzVersion) -> str:
180    return str(version)
181
182
183def is_image_tag_of_release_version(image_tag: str) -> bool:
184    return (
185        IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR not in image_tag
186        and not image_tag.startswith(LEGACY_IMAGE_TAG_COMMIT_PREFIX)
187        and image_tag != LATEST_IMAGE_TAG
188    )
189
190
191def is_image_tag_of_commit(image_tag: str) -> bool:
192    return (
193        IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR in image_tag
194        or image_tag.startswith(LEGACY_IMAGE_TAG_COMMIT_PREFIX)
195    )
196
197
198def get_version_from_image_tag(image_tag: str) -> str:
199    match = VERSION_IN_IMAGE_TAG_PATTERN.match(image_tag)
200    assert match is not None, f"Invalid image tag: {image_tag}"
201
202    return match.group(1)
203
204
205def get_mz_version_from_image_tag(image_tag: str) -> MzVersion:
206    return MzVersion.parse_mz(get_version_from_image_tag(image_tag))
207
208
209def _try_construct_image_tag(commit_hash: str) -> str | None:
210    """Construct the expected image tag from git info, avoiding Docker Hub search."""
211    try:
212        cargo_toml = subprocess.check_output(
213            ["git", "show", f"{commit_hash}:src/environmentd/Cargo.toml"],
214            stderr=subprocess.DEVNULL,
215            text=True,
216        )
217        version = None
218        for line in cargo_toml.splitlines():
219            if line.startswith("version"):
220                match = re.search(r'"([^"]+)"', line)
221                if match:
222                    version = match.group(1)
223                break
224
225        if version is None:
226            return None
227
228        tag = f"v{version}--main.g{commit_hash}"
229        if mz_image_tag_exists(tag, quiet=True):
230            return tag
231
232        return None
233    except Exception:
234        return None
235
236
237def _resolve_image_name_by_commit_hash(commit_hash: str) -> str:
238    if commit_hash in CACHED_IMAGE_NAME_BY_COMMIT_HASH:
239        return CACHED_IMAGE_NAME_BY_COMMIT_HASH[commit_hash]
240
241    # Try constructing the tag directly from git info (avoids Docker Hub search)
242    constructed_tag = _try_construct_image_tag(commit_hash)
243    if constructed_tag is not None:
244        image_name = constructed_tag
245    else:
246        # Fall back to Docker Hub search
247        image_name_candidates = _search_docker_hub_for_image_name(
248            search_value=commit_hash
249        )
250        image_name = _select_image_name_from_candidates(
251            image_name_candidates, commit_hash
252        )
253
254    CACHED_IMAGE_NAME_BY_COMMIT_HASH[commit_hash] = image_name
255    EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
256
257    return image_name
258
259
260def _search_docker_hub_for_image_name(
261    search_value: str, remaining_retries: int = 3
262) -> list[str]:
263    try:
264        json_response = requests.get(
265            f"https://hub.docker.com/v2/repositories/materialize/materialized/tags?name={search_value}"
266        ).json()
267    except (
268        requests.exceptions.ConnectionError,
269        requests.exceptions.JSONDecodeError,
270    ) as e:
271        if remaining_retries > 0:
272            print(
273                f"Searching Docker Hub for image name failed ({e}), retrying in 5 seconds"
274            )
275            time.sleep(5)
276            return _search_docker_hub_for_image_name(
277                search_value, remaining_retries - 1
278            )
279
280        raise
281
282    json_results = json_response.get("results")
283
284    image_names = []
285
286    for entry in json_results:
287        image_name = entry.get("name")
288
289        if image_name.startswith("unstable-"):
290            # for images with the old version scheme favor "devel-" over "unstable-"
291            continue
292
293        image_names.append(image_name)
294
295    return image_names
296
297
298def _select_image_name_from_candidates(
299    image_name_candidates: list[str], commit_hash: str
300) -> str:
301    if len(image_name_candidates) == 0:
302        raise RuntimeError(f"No image found for commit hash {commit_hash}")
303
304    if len(image_name_candidates) > 1:
305        print(
306            f"Multiple images found for commit hash {commit_hash}: {image_name_candidates}, picking first"
307        )
308
309    return image_name_candidates[0]
310
311
312def image_registry() -> str:
313    return (
314        "ghcr.io/materializeinc/materialize"
315        if ui.env_is_truthy("MZ_GHCR", "1")
316        else "materialize"
317    )
CACHED_IMAGE_NAME_BY_COMMIT_HASH: dict[str, str] = {}
EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: dict[str, bool] = {}
IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR = '--'
LATEST_IMAGE_TAG = 'latest'
LEGACY_IMAGE_TAG_COMMIT_PREFIX = 'devel-'
MZ_GHCR_DEFAULT = '0'
VERSION_IN_IMAGE_TAG_PATTERN = re.compile('^(v\\d+\\.\\d+\\.\\d+(-dev)?)')
def image_of_release_version_exists(version: materialize.mz_version.MzVersion, quiet: bool = False) -> bool:
36def image_of_release_version_exists(version: MzVersion, quiet: bool = False) -> bool:
37    if version.is_dev_version():
38        raise ValueError(f"Version {version} is a dev version, not a release version")
39
40    return mz_image_tag_exists(release_version_to_image_tag(version), quiet=quiet)
def image_of_commit_exists(commit_hash: str) -> bool:
43def image_of_commit_exists(commit_hash: str) -> bool:
44    try:
45        return mz_image_tag_exists(commit_to_image_tag(commit_hash))
46    except (RuntimeError, requests.exceptions.RequestException) as e:
47        print(f"Failed to check if image of commit {commit_hash} exists: {e}")
48        return False
def mz_image_tag_exists_cmdline(image_name: str) -> bool:
51def mz_image_tag_exists_cmdline(image_name: str) -> bool:
52    command = [
53        "docker",
54        "manifest",
55        "inspect",
56        image_name,
57    ]
58    try:
59        subprocess.check_output(command, stderr=subprocess.STDOUT, text=True)
60        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
61        return True
62    except subprocess.CalledProcessError as e:
63        if "no such manifest:" in e.output:
64            print(f"Failed to fetch image manifest '{image_name}' (does not exist)")
65            EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
66        else:
67            print(f"Failed to fetch image manifest '{image_name}' ({e.output})")
68            # do not cache the result of unknown error messages
69        return False
def mz_image_tag_exists_ghcr(image_name: str, image_tag: str) -> bool:
 89def mz_image_tag_exists_ghcr(image_name: str, image_tag: str) -> bool:
 90    """Check if an image tag exists on GHCR using the OCI distribution API."""
 91    try:
 92        token = _get_ghcr_token()
 93        response = requests.head(
 94            f"https://ghcr.io/v2/materializeinc/materialize/materialized/manifests/{image_tag}",
 95            headers={
 96                "Authorization": f"Bearer {token}",
 97                "Accept": "application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.docker.distribution.manifest.v2+json",
 98            },
 99        )
100        if response.status_code == 200:
101            EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
102            return True
103        elif response.status_code == 404:
104            EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
105            return False
106        elif response.status_code == 401:
107            # Token may have expired, clear it and fall back to cmdline
108            global _ghcr_token
109            _ghcr_token = None
110            return mz_image_tag_exists_cmdline(image_name)
111        else:
112            print(
113                f"GHCR API returned unexpected status {response.status_code} for {image_name}"
114            )
115            return mz_image_tag_exists_cmdline(image_name)
116    except requests.exceptions.RequestException:
117        return mz_image_tag_exists_cmdline(image_name)

Check if an image tag exists on GHCR using the OCI distribution API.

def mz_image_tag_exists(image_tag: str, quiet: bool = False) -> bool:
120def mz_image_tag_exists(image_tag: str, quiet: bool = False) -> bool:
121    image_name = f"{image_registry()}/materialized:{image_tag}"
122
123    if image_name in EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK:
124        image_exists = EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name]
125        if not quiet:
126            print(
127                f"Status of image {image_name} known from earlier check: {'exists' if image_exists else 'does not exist'}"
128            )
129        return image_exists
130
131    if not quiet:
132        print(f"Checking existence of image manifest: {image_name}")
133
134    command_local = ["docker", "images", "--quiet", image_name]
135
136    output = subprocess.check_output(command_local, stderr=subprocess.STDOUT, text=True)
137    if output:
138        # image found locally, can skip querying remote Docker Hub
139        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
140        return True
141
142    registry = image_registry()
143
144    # Use registry-specific APIs to avoid slow `docker manifest inspect` calls
145    # and Docker Hub rate limits.
146
147    if registry.startswith("ghcr.io/"):
148        return mz_image_tag_exists_ghcr(image_name, image_tag)
149
150    if registry != "materialize":
151        return mz_image_tag_exists_cmdline(image_name)
152
153    # docker manifest inspect counts against the Docker Hub rate limits, even
154    # when the image doesn't exist, see https://www.docker.com/increase-rate-limits/,
155    # so use the API instead.
156    try:
157        response = requests.get(
158            f"https://hub.docker.com/v2/repositories/materialize/materialized/tags/{image_tag}"
159        )
160        result = response.json()
161    except (requests.exceptions.ConnectionError, requests.exceptions.JSONDecodeError):
162        return mz_image_tag_exists_cmdline(image_name)
163
164    if result.get("images"):
165        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
166        return True
167    if "not found" in result.get("message", ""):
168        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
169        return False
170    if not quiet:
171        print(f"Failed to fetch image info from API: {result}")
172    # do not cache the result of unknown error messages
173    return False
def commit_to_image_tag(commit_hash: str) -> str:
176def commit_to_image_tag(commit_hash: str) -> str:
177    return _resolve_image_name_by_commit_hash(commit_hash)
def release_version_to_image_tag(version: materialize.mz_version.MzVersion) -> str:
180def release_version_to_image_tag(version: MzVersion) -> str:
181    return str(version)
def is_image_tag_of_release_version(image_tag: str) -> bool:
184def is_image_tag_of_release_version(image_tag: str) -> bool:
185    return (
186        IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR not in image_tag
187        and not image_tag.startswith(LEGACY_IMAGE_TAG_COMMIT_PREFIX)
188        and image_tag != LATEST_IMAGE_TAG
189    )
def is_image_tag_of_commit(image_tag: str) -> bool:
192def is_image_tag_of_commit(image_tag: str) -> bool:
193    return (
194        IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR in image_tag
195        or image_tag.startswith(LEGACY_IMAGE_TAG_COMMIT_PREFIX)
196    )
def get_version_from_image_tag(image_tag: str) -> str:
199def get_version_from_image_tag(image_tag: str) -> str:
200    match = VERSION_IN_IMAGE_TAG_PATTERN.match(image_tag)
201    assert match is not None, f"Invalid image tag: {image_tag}"
202
203    return match.group(1)
def get_mz_version_from_image_tag(image_tag: str) -> materialize.mz_version.MzVersion:
206def get_mz_version_from_image_tag(image_tag: str) -> MzVersion:
207    return MzVersion.parse_mz(get_version_from_image_tag(image_tag))
def image_registry() -> str:
313def image_registry() -> str:
314    return (
315        "ghcr.io/materializeinc/materialize"
316        if ui.env_is_truthy("MZ_GHCR", "1")
317        else "materialize"
318    )