misc.python.materialize.docker

Docker utilities.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""Docker utilities."""
 11
 12import re
 13import subprocess
 14import time
 15
 16import requests
 17
 18from materialize import ui
 19from materialize.mz_version import MzVersion
 20
 21CACHED_IMAGE_NAME_BY_COMMIT_HASH: dict[str, str] = dict()
 22EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: dict[str, bool] = dict()
 23
 24IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR = "--"
 25LATEST_IMAGE_TAG = "latest"
 26LEGACY_IMAGE_TAG_COMMIT_PREFIX = "devel-"
 27MZ_GHCR_DEFAULT = "1" if ui.env_is_truthy("CI") else "0"
 28
 29# Examples:
 30# * v0.114.0
 31# * v0.114.0-dev
 32# * v0.114.0-dev.0--pr.g3d565dd11ba1224a41beb6a584215d99e6b3c576
 33VERSION_IN_IMAGE_TAG_PATTERN = re.compile(r"^(v\d+\.\d+\.\d+(-dev)?)")
 34
 35
 36def image_of_release_version_exists(version: MzVersion, quiet: bool = False) -> bool:
 37    if version.is_dev_version():
 38        raise ValueError(f"Version {version} is a dev version, not a release version")
 39
 40    return mz_image_tag_exists(release_version_to_image_tag(version), quiet=quiet)
 41
 42
 43def image_of_commit_exists(commit_hash: str) -> bool:
 44    try:
 45        return mz_image_tag_exists(commit_to_image_tag(commit_hash))
 46    except (RuntimeError, requests.exceptions.RequestException) as e:
 47        print(f"Failed to check if image of commit {commit_hash} exists: {e}")
 48        return False
 49
 50
 51def mz_image_tag_exists_cmdline(image_name: str) -> bool:
 52    command = [
 53        "docker",
 54        "manifest",
 55        "inspect",
 56        image_name,
 57    ]
 58    try:
 59        subprocess.check_output(command, stderr=subprocess.STDOUT, text=True)
 60        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
 61        return True
 62    except subprocess.CalledProcessError as e:
 63        if "no such manifest:" in e.output:
 64            print(f"Failed to fetch image manifest '{image_name}' (does not exist)")
 65            EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
 66        else:
 67            print(f"Failed to fetch image manifest '{image_name}' ({e.output})")
 68            # do not cache the result of unknown error messages
 69        return False
 70
 71
 72# Cache for the GHCR anonymous token to avoid re-fetching it for every image check.
 73_ghcr_token: str | None = None
 74
 75
 76def _get_ghcr_token() -> str:
 77    """Get an anonymous token for pulling from GHCR."""
 78    global _ghcr_token
 79    if _ghcr_token is None:
 80        response = requests.get(
 81            "https://ghcr.io/token?scope=repository:materializeinc/materialize/materialized:pull"
 82        )
 83        response.raise_for_status()
 84        _ghcr_token = response.json()["token"]
 85    assert _ghcr_token is not None
 86    return _ghcr_token
 87
 88
 89def mz_image_tag_exists_ghcr(image_name: str, image_tag: str) -> bool:
 90    """Check if an image tag exists on GHCR using the OCI distribution API."""
 91    try:
 92        token = _get_ghcr_token()
 93        response = requests.head(
 94            f"https://ghcr.io/v2/materializeinc/materialize/materialized/manifests/{image_tag}",
 95            headers={
 96                "Authorization": f"Bearer {token}",
 97                "Accept": "application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.docker.distribution.manifest.v2+json",
 98            },
 99        )
100        if response.status_code == 200:
101            EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
102            return True
103        elif response.status_code == 404:
104            EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
105            return False
106        elif response.status_code == 401:
107            # Token may have expired, clear it and fall back to cmdline
108            global _ghcr_token
109            _ghcr_token = None
110            return mz_image_tag_exists_cmdline(image_name)
111        else:
112            print(
113                f"GHCR API returned unexpected status {response.status_code} for {image_name}"
114            )
115            return mz_image_tag_exists_cmdline(image_name)
116    except requests.exceptions.RequestException:
117        return mz_image_tag_exists_cmdline(image_name)
118
119
120def mz_image_tag_exists(image_tag: str, quiet: bool = False) -> bool:
121    image_name = f"{image_registry()}/materialized:{image_tag}"
122
123    if image_name in EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK:
124        image_exists = EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name]
125        if not quiet:
126            print(
127                f"Status of image {image_name} known from earlier check: {'exists' if image_exists else 'does not exist'}"
128            )
129        return image_exists
130
131    if not quiet:
132        print(f"Checking existence of image manifest: {image_name}")
133
134    command_local = ["docker", "images", "--quiet", image_name]
135
136    output = subprocess.check_output(command_local, stderr=subprocess.STDOUT, text=True)
137    if output:
138        # image found locally, can skip querying remote Docker Hub
139        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
140        return True
141
142    registry = image_registry()
143
144    # Use registry-specific APIs to avoid slow `docker manifest inspect` calls
145    # and Docker Hub rate limits.
146
147    if registry.startswith("ghcr.io/"):
148        return mz_image_tag_exists_ghcr(image_name, image_tag)
149
150    if registry != "materialize":
151        return mz_image_tag_exists_cmdline(image_name)
152
153    # docker manifest inspect counts against the Docker Hub rate limits, even
154    # when the image doesn't exist, see https://www.docker.com/increase-rate-limits/,
155    # so use the API instead.
156    try:
157        response = requests.get(
158            f"https://hub.docker.com/v2/repositories/materialize/materialized/tags/{image_tag}"
159        )
160        result = response.json()
161    except (requests.exceptions.ConnectionError, requests.exceptions.JSONDecodeError):
162        return mz_image_tag_exists_cmdline(image_name)
163
164    if result.get("images"):
165        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
166        return True
167    if "not found" in result.get("message", ""):
168        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
169        return False
170    if not quiet:
171        print(f"Failed to fetch image info from API: {result}")
172    # do not cache the result of unknown error messages
173    return False
174
175
176def commit_to_image_tag(commit_hash: str) -> str:
177    return _resolve_image_name_by_commit_hash(commit_hash)
178
179
180def release_version_to_image_tag(version: MzVersion) -> str:
181    return str(version)
182
183
184def is_image_tag_of_release_version(image_tag: str) -> bool:
185    return (
186        IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR not in image_tag
187        and not image_tag.startswith(LEGACY_IMAGE_TAG_COMMIT_PREFIX)
188        and image_tag != LATEST_IMAGE_TAG
189    )
190
191
192def get_version_from_image_tag(image_tag: str) -> str:
193    match = VERSION_IN_IMAGE_TAG_PATTERN.match(image_tag)
194    assert match is not None, f"Invalid image tag: {image_tag}"
195
196    return match.group(1)
197
198
199def get_mz_version_from_image_tag(image_tag: str) -> MzVersion:
200    return MzVersion.parse_mz(get_version_from_image_tag(image_tag))
201
202
203def _try_construct_image_tag(commit_hash: str) -> str | None:
204    """Construct the expected image tag from git info, avoiding Docker Hub search."""
205    try:
206        cargo_toml = subprocess.check_output(
207            ["git", "show", f"{commit_hash}:src/environmentd/Cargo.toml"],
208            stderr=subprocess.DEVNULL,
209            text=True,
210        )
211        version = None
212        for line in cargo_toml.splitlines():
213            if line.startswith("version"):
214                match = re.search(r'"([^"]+)"', line)
215                if match:
216                    version = match.group(1)
217                break
218
219        if version is None:
220            return None
221
222        tag = f"v{version}--main.g{commit_hash}"
223        if mz_image_tag_exists(tag, quiet=True):
224            return tag
225
226        return None
227    except Exception:
228        return None
229
230
231def _resolve_image_name_by_commit_hash(commit_hash: str) -> str:
232    if commit_hash in CACHED_IMAGE_NAME_BY_COMMIT_HASH:
233        return CACHED_IMAGE_NAME_BY_COMMIT_HASH[commit_hash]
234
235    # Try constructing the tag directly from git info (avoids Docker Hub search)
236    constructed_tag = _try_construct_image_tag(commit_hash)
237    if constructed_tag is not None:
238        image_name = constructed_tag
239    else:
240        # Fall back to Docker Hub search
241        image_name_candidates = _search_docker_hub_for_image_name(
242            search_value=commit_hash
243        )
244        image_name = _select_image_name_from_candidates(
245            image_name_candidates, commit_hash
246        )
247
248    CACHED_IMAGE_NAME_BY_COMMIT_HASH[commit_hash] = image_name
249    EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
250
251    return image_name
252
253
254def _search_docker_hub_for_image_name(
255    search_value: str, remaining_retries: int = 3
256) -> list[str]:
257    try:
258        json_response = requests.get(
259            f"https://hub.docker.com/v2/repositories/materialize/materialized/tags?name={search_value}"
260        ).json()
261    except (
262        requests.exceptions.ConnectionError,
263        requests.exceptions.JSONDecodeError,
264    ) as e:
265        if remaining_retries > 0:
266            print(
267                f"Searching Docker Hub for image name failed ({e}), retrying in 5 seconds"
268            )
269            time.sleep(5)
270            return _search_docker_hub_for_image_name(
271                search_value, remaining_retries - 1
272            )
273
274        raise
275
276    json_results = json_response.get("results")
277
278    image_names = []
279
280    for entry in json_results:
281        image_name = entry.get("name")
282
283        if image_name.startswith("unstable-"):
284            # for images with the old version scheme favor "devel-" over "unstable-"
285            continue
286
287        image_names.append(image_name)
288
289    return image_names
290
291
292def _select_image_name_from_candidates(
293    image_name_candidates: list[str], commit_hash: str
294) -> str:
295    if len(image_name_candidates) == 0:
296        raise RuntimeError(f"No image found for commit hash {commit_hash}")
297
298    if len(image_name_candidates) > 1:
299        print(
300            f"Multiple images found for commit hash {commit_hash}: {image_name_candidates}, picking first"
301        )
302
303    return image_name_candidates[0]
304
305
306def image_registry() -> str:
307    return (
308        "ghcr.io/materializeinc/materialize"
309        if ui.env_is_truthy("MZ_GHCR", "1")
310        else "materialize"
311    )
CACHED_IMAGE_NAME_BY_COMMIT_HASH: dict[str, str] = {}
EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK: dict[str, bool] = {}
IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR = '--'
LATEST_IMAGE_TAG = 'latest'
LEGACY_IMAGE_TAG_COMMIT_PREFIX = 'devel-'
MZ_GHCR_DEFAULT = '0'
VERSION_IN_IMAGE_TAG_PATTERN = re.compile('^(v\\d+\\.\\d+\\.\\d+(-dev)?)')
def image_of_release_version_exists(version: materialize.mz_version.MzVersion, quiet: bool = False) -> bool:
37def image_of_release_version_exists(version: MzVersion, quiet: bool = False) -> bool:
38    if version.is_dev_version():
39        raise ValueError(f"Version {version} is a dev version, not a release version")
40
41    return mz_image_tag_exists(release_version_to_image_tag(version), quiet=quiet)
def image_of_commit_exists(commit_hash: str) -> bool:
44def image_of_commit_exists(commit_hash: str) -> bool:
45    try:
46        return mz_image_tag_exists(commit_to_image_tag(commit_hash))
47    except (RuntimeError, requests.exceptions.RequestException) as e:
48        print(f"Failed to check if image of commit {commit_hash} exists: {e}")
49        return False
def mz_image_tag_exists_cmdline(image_name: str) -> bool:
52def mz_image_tag_exists_cmdline(image_name: str) -> bool:
53    command = [
54        "docker",
55        "manifest",
56        "inspect",
57        image_name,
58    ]
59    try:
60        subprocess.check_output(command, stderr=subprocess.STDOUT, text=True)
61        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
62        return True
63    except subprocess.CalledProcessError as e:
64        if "no such manifest:" in e.output:
65            print(f"Failed to fetch image manifest '{image_name}' (does not exist)")
66            EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
67        else:
68            print(f"Failed to fetch image manifest '{image_name}' ({e.output})")
69            # do not cache the result of unknown error messages
70        return False
def mz_image_tag_exists_ghcr(image_name: str, image_tag: str) -> bool:
 90def mz_image_tag_exists_ghcr(image_name: str, image_tag: str) -> bool:
 91    """Check if an image tag exists on GHCR using the OCI distribution API."""
 92    try:
 93        token = _get_ghcr_token()
 94        response = requests.head(
 95            f"https://ghcr.io/v2/materializeinc/materialize/materialized/manifests/{image_tag}",
 96            headers={
 97                "Authorization": f"Bearer {token}",
 98                "Accept": "application/vnd.oci.image.index.v1+json, application/vnd.docker.distribution.manifest.list.v2+json, application/vnd.docker.distribution.manifest.v2+json",
 99            },
100        )
101        if response.status_code == 200:
102            EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
103            return True
104        elif response.status_code == 404:
105            EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
106            return False
107        elif response.status_code == 401:
108            # Token may have expired, clear it and fall back to cmdline
109            global _ghcr_token
110            _ghcr_token = None
111            return mz_image_tag_exists_cmdline(image_name)
112        else:
113            print(
114                f"GHCR API returned unexpected status {response.status_code} for {image_name}"
115            )
116            return mz_image_tag_exists_cmdline(image_name)
117    except requests.exceptions.RequestException:
118        return mz_image_tag_exists_cmdline(image_name)

Check if an image tag exists on GHCR using the OCI distribution API.

def mz_image_tag_exists(image_tag: str, quiet: bool = False) -> bool:
121def mz_image_tag_exists(image_tag: str, quiet: bool = False) -> bool:
122    image_name = f"{image_registry()}/materialized:{image_tag}"
123
124    if image_name in EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK:
125        image_exists = EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name]
126        if not quiet:
127            print(
128                f"Status of image {image_name} known from earlier check: {'exists' if image_exists else 'does not exist'}"
129            )
130        return image_exists
131
132    if not quiet:
133        print(f"Checking existence of image manifest: {image_name}")
134
135    command_local = ["docker", "images", "--quiet", image_name]
136
137    output = subprocess.check_output(command_local, stderr=subprocess.STDOUT, text=True)
138    if output:
139        # image found locally, can skip querying remote Docker Hub
140        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
141        return True
142
143    registry = image_registry()
144
145    # Use registry-specific APIs to avoid slow `docker manifest inspect` calls
146    # and Docker Hub rate limits.
147
148    if registry.startswith("ghcr.io/"):
149        return mz_image_tag_exists_ghcr(image_name, image_tag)
150
151    if registry != "materialize":
152        return mz_image_tag_exists_cmdline(image_name)
153
154    # docker manifest inspect counts against the Docker Hub rate limits, even
155    # when the image doesn't exist, see https://www.docker.com/increase-rate-limits/,
156    # so use the API instead.
157    try:
158        response = requests.get(
159            f"https://hub.docker.com/v2/repositories/materialize/materialized/tags/{image_tag}"
160        )
161        result = response.json()
162    except (requests.exceptions.ConnectionError, requests.exceptions.JSONDecodeError):
163        return mz_image_tag_exists_cmdline(image_name)
164
165    if result.get("images"):
166        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = True
167        return True
168    if "not found" in result.get("message", ""):
169        EXISTENCE_OF_IMAGE_NAMES_FROM_EARLIER_CHECK[image_name] = False
170        return False
171    if not quiet:
172        print(f"Failed to fetch image info from API: {result}")
173    # do not cache the result of unknown error messages
174    return False
def commit_to_image_tag(commit_hash: str) -> str:
177def commit_to_image_tag(commit_hash: str) -> str:
178    return _resolve_image_name_by_commit_hash(commit_hash)
def release_version_to_image_tag(version: materialize.mz_version.MzVersion) -> str:
181def release_version_to_image_tag(version: MzVersion) -> str:
182    return str(version)
def is_image_tag_of_release_version(image_tag: str) -> bool:
185def is_image_tag_of_release_version(image_tag: str) -> bool:
186    return (
187        IMAGE_TAG_OF_DEV_VERSION_METADATA_SEPARATOR not in image_tag
188        and not image_tag.startswith(LEGACY_IMAGE_TAG_COMMIT_PREFIX)
189        and image_tag != LATEST_IMAGE_TAG
190    )
def get_version_from_image_tag(image_tag: str) -> str:
193def get_version_from_image_tag(image_tag: str) -> str:
194    match = VERSION_IN_IMAGE_TAG_PATTERN.match(image_tag)
195    assert match is not None, f"Invalid image tag: {image_tag}"
196
197    return match.group(1)
def get_mz_version_from_image_tag(image_tag: str) -> materialize.mz_version.MzVersion:
200def get_mz_version_from_image_tag(image_tag: str) -> MzVersion:
201    return MzVersion.parse_mz(get_version_from_image_tag(image_tag))
def image_registry() -> str:
307def image_registry() -> str:
308    return (
309        "ghcr.io/materializeinc/materialize"
310        if ui.env_is_truthy("MZ_GHCR", "1")
311        else "materialize"
312    )