misc.python.materialize.scratch

Utilities for launching and interacting with scratch EC2 instances.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""Utilities for launching and interacting with scratch EC2 instances."""
 11
 12import asyncio
 13import csv
 14import datetime
 15import os
 16import shlex
 17import subprocess
 18import sys
 19from subprocess import CalledProcessError
 20from typing import NamedTuple, cast
 21
 22import boto3
 23from botocore.exceptions import ClientError
 24from mypy_boto3_ec2.literals import InstanceTypeType
 25from mypy_boto3_ec2.service_resource import Instance
 26from mypy_boto3_ec2.type_defs import (
 27    FilterTypeDef,
 28    InstanceNetworkInterfaceSpecificationTypeDef,
 29    InstanceTypeDef,
 30    RunInstancesRequestServiceResourceCreateInstancesTypeDef,
 31)
 32from prettytable import PrettyTable
 33from pydantic import BaseModel
 34
 35from materialize import MZ_ROOT, git, spawn, ui, util
 36
 37# Sane defaults for internal Materialize use in the scratch account
 38DEFAULT_SECURITY_GROUP_NAME = "scratch-security-group"
 39DEFAULT_INSTANCE_PROFILE_NAME = "admin-instance"
 40
 41SSH_COMMAND = ["mssh", "-o", "StrictHostKeyChecking=off"]
 42SFTP_COMMAND = ["msftp", "-o", "StrictHostKeyChecking=off"]
 43
 44say = ui.speaker("scratch> ")
 45
 46
 47def tags(i: Instance) -> dict[str, str]:
 48    if not i.tags:
 49        return {}
 50    return {t["Key"]: t["Value"] for t in i.tags}
 51
 52
 53def instance_typedef_tags(i: InstanceTypeDef) -> dict[str, str]:
 54    return {t["Key"]: t["Value"] for t in i.get("Tags", [])}
 55
 56
 57def name(tags: dict[str, str]) -> str | None:
 58    return tags.get("Name")
 59
 60
 61def launched_by(tags: dict[str, str]) -> str | None:
 62    return tags.get("LaunchedBy")
 63
 64
 65def ami_user(tags: dict[str, str]) -> str | None:
 66    return tags.get("ami-user", "ubuntu")
 67
 68
 69def delete_after(tags: dict[str, str]) -> datetime.datetime | None:
 70    unix = tags.get("scratch-delete-after")
 71    if not unix:
 72        return None
 73    unix = int(float(unix))
 74    return datetime.datetime.fromtimestamp(unix)
 75
 76
 77def instance_host(instance: Instance, user: str | None = None) -> str:
 78    if user is None:
 79        user = ami_user(tags(instance))
 80    return f"{user}@{instance.id}"
 81
 82
 83def print_instances(ists: list[Instance], format: str) -> None:
 84    field_names = [
 85        "Name",
 86        "Instance ID",
 87        "Public IP Address",
 88        "Private IP Address",
 89        "Launched By",
 90        "Delete After",
 91        "State",
 92    ]
 93    rows = [
 94        [
 95            name(tags),
 96            i.instance_id,
 97            i.public_ip_address,
 98            i.private_ip_address,
 99            launched_by(tags),
100            delete_after(tags),
101            i.state["Name"],
102        ]
103        for (i, tags) in [(i, tags(i)) for i in ists]
104    ]
105    if format == "table":
106        pt = PrettyTable()
107        pt.field_names = field_names
108        pt.add_rows(rows)
109        print(pt)
110    elif format == "csv":
111        w = csv.writer(sys.stdout)
112        w.writerow(field_names)
113        w.writerows(rows)
114    else:
115        raise RuntimeError("Unknown format passed to print_instances")
116
117
118def launch(
119    *,
120    key_name: str | None,
121    instance_type: str,
122    ami: str,
123    ami_user: str,
124    tags: dict[str, str],
125    display_name: str | None = None,
126    size_gb: int,
127    security_group_name: str,
128    instance_profile: str | None,
129    nonce: str,
130    delete_after: datetime.datetime,
131) -> Instance:
132    """Launch and configure an ec2 instance with the given properties."""
133
134    if display_name:
135        tags["Name"] = display_name
136    tags["scratch-delete-after"] = str(delete_after.timestamp())
137    tags["nonce"] = nonce
138    tags["git_ref"] = git.describe()
139    tags["ami-user"] = ami_user
140
141    ec2 = boto3.client("ec2")
142    groups = ec2.describe_security_groups()
143    security_group_id = None
144    for group in groups["SecurityGroups"]:
145        if group["GroupName"] == security_group_name:
146            security_group_id = group["GroupId"]
147            break
148
149    if security_group_id is None:
150        vpcs = ec2.describe_vpcs()
151        vpc_id = None
152        for vpc in vpcs["Vpcs"]:
153            if vpc["IsDefault"] == True:
154                vpc_id = vpc["VpcId"]
155                break
156        if vpc_id is None:
157            default_vpc = ec2.create_default_vpc()
158            vpc_id = default_vpc["Vpc"]["VpcId"]
159        securitygroup = ec2.create_security_group(
160            GroupName=security_group_name,
161            Description="Allows all.",
162            VpcId=vpc_id,
163        )
164        security_group_id = securitygroup["GroupId"]
165        ec2.authorize_security_group_ingress(
166            GroupId=security_group_id,
167            CidrIp="0.0.0.0/0",
168            IpProtocol="tcp",
169            FromPort=22,
170            ToPort=22,
171        )
172
173    network_interface: InstanceNetworkInterfaceSpecificationTypeDef = {
174        "AssociatePublicIpAddress": True,
175        "DeviceIndex": 0,
176        "Groups": [security_group_id],
177    }
178
179    say(f"launching instance {display_name or '(unnamed)'}")
180    with open(MZ_ROOT / "misc" / "scratch" / "provision.bash") as f:
181        provisioning_script = f.read()
182    kwargs: RunInstancesRequestServiceResourceCreateInstancesTypeDef = {
183        "MinCount": 1,
184        "MaxCount": 1,
185        "ImageId": ami,
186        "InstanceType": cast(InstanceTypeType, instance_type),
187        "UserData": provisioning_script,
188        "TagSpecifications": [
189            {
190                "ResourceType": "instance",
191                "Tags": [{"Key": k, "Value": v} for (k, v) in tags.items()],
192            }
193        ],
194        "NetworkInterfaces": [network_interface],
195        "BlockDeviceMappings": [
196            {
197                "DeviceName": "/dev/sda1",
198                "Ebs": {
199                    "VolumeSize": size_gb,
200                    "VolumeType": "gp3",
201                },
202            }
203        ],
204        "MetadataOptions": {
205            # Allow Docker containers to access IMDSv2.
206            "HttpPutResponseHopLimit": 2,
207        },
208    }
209    if key_name:
210        kwargs["KeyName"] = key_name
211    if instance_profile:
212        kwargs["IamInstanceProfile"] = {"Name": instance_profile}
213    i = boto3.resource("ec2").create_instances(**kwargs)[0]
214
215    return i
216
217
218class CommandResult(NamedTuple):
219    status: str
220    stdout: str
221    stderr: str
222
223
224async def setup(
225    i: Instance,
226    git_rev: str,
227) -> None:
228    def is_ready(i: Instance) -> bool:
229        return bool(
230            i.public_ip_address and i.state and i.state.get("Name") == "running"
231        )
232
233    done = False
234    async for remaining in ui.async_timeout_loop(60, 5):
235        say(f"Waiting for instance to become ready: {remaining:0.0f}s remaining")
236        try:
237            i.reload()
238            if is_ready(i):
239                done = True
240                break
241        except ClientError:
242            pass
243    if not done:
244        raise RuntimeError(
245            f"Instance {i} did not become ready in a reasonable amount of time"
246        )
247
248    done = False
249    async for remaining in ui.async_timeout_loop(300, 5):
250        say(f"Checking whether setup has completed: {remaining:0.0f}s remaining")
251        try:
252            mssh(i, "[[ -f /opt/provision/done ]]")
253            done = True
254            break
255        except CalledProcessError:
256            continue
257    if not done:
258        raise RuntimeError(
259            "Instance did not finish setup in a reasonable amount of time"
260        )
261
262    mkrepo(i, git_rev)
263
264
265def mkrepo(i: Instance, rev: str, init: bool = True, force: bool = False) -> None:
266    if init:
267        mssh(i, "git clone https://github.com/MaterializeInc/materialize.git")
268
269    rev = git.rev_parse(rev)
270
271    cmd: list[str] = [
272        "git",
273        "push",
274        "--no-verify",
275        f"{instance_host(i)}:materialize/.git",
276        # Explicit refspec is required if the host repository is in detached
277        # HEAD mode.
278        f"{rev}:refs/heads/scratch",
279        "--no-recurse-submodules",
280    ]
281    if force:
282        cmd.append("--force")
283
284    spawn.runv(
285        cmd,
286        cwd=MZ_ROOT,
287        env=dict(os.environ, GIT_SSH_COMMAND=" ".join(SSH_COMMAND)),
288    )
289    mssh(
290        i,
291        f"cd materialize && git config core.bare false && git checkout {rev} && git submodule sync --recursive && git submodule update --recursive",
292    )
293
294
295class MachineDesc(BaseModel):
296    name: str
297    launch_script: str | None
298    instance_type: str
299    ami: str
300    tags: dict[str, str] = {}
301    size_gb: int
302    checkout: bool = True
303    ami_user: str = "ubuntu"
304
305
306def launch_cluster(
307    descs: list[MachineDesc],
308    *,
309    nonce: str | None = None,
310    key_name: str | None = None,
311    security_group_name: str = DEFAULT_SECURITY_GROUP_NAME,
312    instance_profile: str | None = DEFAULT_INSTANCE_PROFILE_NAME,
313    extra_tags: dict[str, str] = {},
314    delete_after: datetime.datetime,
315    git_rev: str = "HEAD",
316    extra_env: dict[str, str] = {},
317) -> list[Instance]:
318    """Launch a cluster of instances with a given nonce"""
319
320    if not nonce:
321        nonce = util.nonce(8)
322
323    instances = [
324        launch(
325            key_name=key_name,
326            instance_type=d.instance_type,
327            ami=d.ami,
328            ami_user=d.ami_user,
329            tags={**d.tags, **extra_tags},
330            display_name=f"{nonce}-{d.name}",
331            size_gb=d.size_gb,
332            security_group_name=security_group_name,
333            instance_profile=instance_profile,
334            nonce=nonce,
335            delete_after=delete_after,
336        )
337        for d in descs
338    ]
339
340    loop = asyncio.get_event_loop()
341    loop.run_until_complete(
342        asyncio.gather(
343            *(
344                setup(i, git_rev if d.checkout else "HEAD")
345                for (i, d) in zip(instances, descs)
346            )
347        )
348    )
349
350    hosts_str = "".join(
351        f"{i.private_ip_address}\t{d.name}\n" for (i, d) in zip(instances, descs)
352    )
353    for i in instances:
354        mssh(i, "sudo tee -a /etc/hosts", input=hosts_str.encode())
355
356    env = " ".join(f"{k}={shlex.quote(v)}" for k, v in extra_env.items())
357    for i, d in zip(instances, descs):
358        if d.launch_script:
359            mssh(
360                i,
361                f"(cd materialize && {env} nohup bash -c {shlex.quote(d.launch_script)}) &> mzscratch.log &",
362            )
363
364    return instances
365
366
367def whoami() -> str:
368    return boto3.client("sts").get_caller_identity()["UserId"].split(":")[1]
369
370
371def get_instance(name: str) -> Instance:
372    """
373    Get an instance by instance id. The special name 'mine' resolves to a
374    unique running owned instance, if there is one; otherwise the name is
375    assumed to be an instance id.
376    :param name: The instance id or the special case 'mine'.
377    :return: The instance to which the name refers.
378    """
379    if name == "mine":
380        filters: list[FilterTypeDef] = [
381            {"Name": "tag:LaunchedBy", "Values": [whoami()]},
382            {"Name": "instance-state-name", "Values": ["pending", "running"]},
383        ]
384        instances = [i for i in boto3.resource("ec2").instances.filter(Filters=filters)]
385        if not instances:
386            raise RuntimeError("can't understand 'mine': no owned instance?")
387        if len(instances) > 1:
388            raise RuntimeError(
389                f"can't understand 'mine': too many owned instances ({', '.join(i.id for i in instances)})"
390            )
391        instance = instances[0]
392        say(f"understanding 'mine' as unique owned instance {instance.id}")
393        return instance
394    return boto3.resource("ec2").Instance(name)
395
396
397def get_instances_by_tag(k: str, v: str) -> list[InstanceTypeDef]:
398    return [
399        i
400        for r in boto3.client("ec2").describe_instances()["Reservations"]
401        for i in r["Instances"]
402        if instance_typedef_tags(i).get(k) == v
403    ]
404
405
406def get_old_instances() -> list[InstanceTypeDef]:
407    def exists(i: InstanceTypeDef) -> bool:
408        return i["State"]["Name"] != "terminated"
409
410    def is_old(i: InstanceTypeDef) -> bool:
411        delete_after = instance_typedef_tags(i).get("scratch-delete-after")
412        if delete_after is None:
413            return False
414        delete_after = float(delete_after)
415        return datetime.datetime.utcnow().timestamp() > delete_after
416
417    return [
418        i
419        for r in boto3.client("ec2").describe_instances()["Reservations"]
420        for i in r["Instances"]
421        if exists(i) and is_old(i)
422    ]
423
424
425def mssh(
426    instance: Instance,
427    command: str,
428    *,
429    extra_ssh_args: list[str] = [],
430    input: bytes | None = None,
431) -> None:
432    """Runs a command over SSH via EC2 Instance Connect."""
433    host = instance_host(instance)
434    if command:
435        print(f"{host}$ {command}", file=sys.stderr)
436        # Quote to work around:
437        # https://github.com/aws/aws-ec2-instance-connect-cli/pull/26
438        command = shlex.quote(command)
439    else:
440        print(f"$ mssh {host}")
441
442    subprocess.run(
443        [
444            *SSH_COMMAND,
445            *extra_ssh_args,
446            host,
447            command,
448        ],
449        check=True,
450        input=input,
451    )
452
453
454def msftp(
455    instance: Instance,
456) -> None:
457    """Connects over SFTP via EC2 Instance Connect."""
458    host = instance_host(instance)
459    spawn.runv([*SFTP_COMMAND, host])
DEFAULT_SECURITY_GROUP_NAME = 'scratch-security-group'
DEFAULT_INSTANCE_PROFILE_NAME = 'admin-instance'
SSH_COMMAND = ['mssh', '-o', 'StrictHostKeyChecking=off']
SFTP_COMMAND = ['msftp', '-o', 'StrictHostKeyChecking=off']
def say(msg: str) -> None:
55    def say(msg: str) -> None:
56        if not Verbosity.quiet:
57            print(f"{prefix}{msg}", file=sys.stderr)
def tags(i: mypy_boto3_ec2.service_resource.Instance) -> dict[str, str]:
48def tags(i: Instance) -> dict[str, str]:
49    if not i.tags:
50        return {}
51    return {t["Key"]: t["Value"] for t in i.tags}
def instance_typedef_tags(i: mypy_boto3_ec2.type_defs.InstanceTypeDef) -> dict[str, str]:
54def instance_typedef_tags(i: InstanceTypeDef) -> dict[str, str]:
55    return {t["Key"]: t["Value"] for t in i.get("Tags", [])}
def name(tags: dict[str, str]) -> str | None:
58def name(tags: dict[str, str]) -> str | None:
59    return tags.get("Name")
def launched_by(tags: dict[str, str]) -> str | None:
62def launched_by(tags: dict[str, str]) -> str | None:
63    return tags.get("LaunchedBy")
def ami_user(tags: dict[str, str]) -> str | None:
66def ami_user(tags: dict[str, str]) -> str | None:
67    return tags.get("ami-user", "ubuntu")
def delete_after(tags: dict[str, str]) -> datetime.datetime | None:
70def delete_after(tags: dict[str, str]) -> datetime.datetime | None:
71    unix = tags.get("scratch-delete-after")
72    if not unix:
73        return None
74    unix = int(float(unix))
75    return datetime.datetime.fromtimestamp(unix)
def instance_host( instance: mypy_boto3_ec2.service_resource.Instance, user: str | None = None) -> str:
78def instance_host(instance: Instance, user: str | None = None) -> str:
79    if user is None:
80        user = ami_user(tags(instance))
81    return f"{user}@{instance.id}"
def launch( *, key_name: str | None, instance_type: str, ami: str, ami_user: str, tags: dict[str, str], display_name: str | None = None, size_gb: int, security_group_name: str, instance_profile: str | None, nonce: str, delete_after: datetime.datetime) -> mypy_boto3_ec2.service_resource.Instance:
119def launch(
120    *,
121    key_name: str | None,
122    instance_type: str,
123    ami: str,
124    ami_user: str,
125    tags: dict[str, str],
126    display_name: str | None = None,
127    size_gb: int,
128    security_group_name: str,
129    instance_profile: str | None,
130    nonce: str,
131    delete_after: datetime.datetime,
132) -> Instance:
133    """Launch and configure an ec2 instance with the given properties."""
134
135    if display_name:
136        tags["Name"] = display_name
137    tags["scratch-delete-after"] = str(delete_after.timestamp())
138    tags["nonce"] = nonce
139    tags["git_ref"] = git.describe()
140    tags["ami-user"] = ami_user
141
142    ec2 = boto3.client("ec2")
143    groups = ec2.describe_security_groups()
144    security_group_id = None
145    for group in groups["SecurityGroups"]:
146        if group["GroupName"] == security_group_name:
147            security_group_id = group["GroupId"]
148            break
149
150    if security_group_id is None:
151        vpcs = ec2.describe_vpcs()
152        vpc_id = None
153        for vpc in vpcs["Vpcs"]:
154            if vpc["IsDefault"] == True:
155                vpc_id = vpc["VpcId"]
156                break
157        if vpc_id is None:
158            default_vpc = ec2.create_default_vpc()
159            vpc_id = default_vpc["Vpc"]["VpcId"]
160        securitygroup = ec2.create_security_group(
161            GroupName=security_group_name,
162            Description="Allows all.",
163            VpcId=vpc_id,
164        )
165        security_group_id = securitygroup["GroupId"]
166        ec2.authorize_security_group_ingress(
167            GroupId=security_group_id,
168            CidrIp="0.0.0.0/0",
169            IpProtocol="tcp",
170            FromPort=22,
171            ToPort=22,
172        )
173
174    network_interface: InstanceNetworkInterfaceSpecificationTypeDef = {
175        "AssociatePublicIpAddress": True,
176        "DeviceIndex": 0,
177        "Groups": [security_group_id],
178    }
179
180    say(f"launching instance {display_name or '(unnamed)'}")
181    with open(MZ_ROOT / "misc" / "scratch" / "provision.bash") as f:
182        provisioning_script = f.read()
183    kwargs: RunInstancesRequestServiceResourceCreateInstancesTypeDef = {
184        "MinCount": 1,
185        "MaxCount": 1,
186        "ImageId": ami,
187        "InstanceType": cast(InstanceTypeType, instance_type),
188        "UserData": provisioning_script,
189        "TagSpecifications": [
190            {
191                "ResourceType": "instance",
192                "Tags": [{"Key": k, "Value": v} for (k, v) in tags.items()],
193            }
194        ],
195        "NetworkInterfaces": [network_interface],
196        "BlockDeviceMappings": [
197            {
198                "DeviceName": "/dev/sda1",
199                "Ebs": {
200                    "VolumeSize": size_gb,
201                    "VolumeType": "gp3",
202                },
203            }
204        ],
205        "MetadataOptions": {
206            # Allow Docker containers to access IMDSv2.
207            "HttpPutResponseHopLimit": 2,
208        },
209    }
210    if key_name:
211        kwargs["KeyName"] = key_name
212    if instance_profile:
213        kwargs["IamInstanceProfile"] = {"Name": instance_profile}
214    i = boto3.resource("ec2").create_instances(**kwargs)[0]
215
216    return i

Launch and configure an ec2 instance with the given properties.

class CommandResult(typing.NamedTuple):
219class CommandResult(NamedTuple):
220    status: str
221    stdout: str
222    stderr: str

CommandResult(status, stdout, stderr)

CommandResult(status: str, stdout: str, stderr: str)

Create new instance of CommandResult(status, stdout, stderr)

status: str

Alias for field number 0

stdout: str

Alias for field number 1

stderr: str

Alias for field number 2

async def setup(i: mypy_boto3_ec2.service_resource.Instance, git_rev: str) -> None:
225async def setup(
226    i: Instance,
227    git_rev: str,
228) -> None:
229    def is_ready(i: Instance) -> bool:
230        return bool(
231            i.public_ip_address and i.state and i.state.get("Name") == "running"
232        )
233
234    done = False
235    async for remaining in ui.async_timeout_loop(60, 5):
236        say(f"Waiting for instance to become ready: {remaining:0.0f}s remaining")
237        try:
238            i.reload()
239            if is_ready(i):
240                done = True
241                break
242        except ClientError:
243            pass
244    if not done:
245        raise RuntimeError(
246            f"Instance {i} did not become ready in a reasonable amount of time"
247        )
248
249    done = False
250    async for remaining in ui.async_timeout_loop(300, 5):
251        say(f"Checking whether setup has completed: {remaining:0.0f}s remaining")
252        try:
253            mssh(i, "[[ -f /opt/provision/done ]]")
254            done = True
255            break
256        except CalledProcessError:
257            continue
258    if not done:
259        raise RuntimeError(
260            "Instance did not finish setup in a reasonable amount of time"
261        )
262
263    mkrepo(i, git_rev)
def mkrepo( i: mypy_boto3_ec2.service_resource.Instance, rev: str, init: bool = True, force: bool = False) -> None:
266def mkrepo(i: Instance, rev: str, init: bool = True, force: bool = False) -> None:
267    if init:
268        mssh(i, "git clone https://github.com/MaterializeInc/materialize.git")
269
270    rev = git.rev_parse(rev)
271
272    cmd: list[str] = [
273        "git",
274        "push",
275        "--no-verify",
276        f"{instance_host(i)}:materialize/.git",
277        # Explicit refspec is required if the host repository is in detached
278        # HEAD mode.
279        f"{rev}:refs/heads/scratch",
280        "--no-recurse-submodules",
281    ]
282    if force:
283        cmd.append("--force")
284
285    spawn.runv(
286        cmd,
287        cwd=MZ_ROOT,
288        env=dict(os.environ, GIT_SSH_COMMAND=" ".join(SSH_COMMAND)),
289    )
290    mssh(
291        i,
292        f"cd materialize && git config core.bare false && git checkout {rev} && git submodule sync --recursive && git submodule update --recursive",
293    )
class MachineDesc(pydantic.main.BaseModel):
296class MachineDesc(BaseModel):
297    name: str
298    launch_script: str | None
299    instance_type: str
300    ami: str
301    tags: dict[str, str] = {}
302    size_gb: int
303    checkout: bool = True
304    ami_user: str = "ubuntu"

Usage docs: https://docs.pydantic.dev/2.8/concepts/models/

A base class for creating Pydantic models.

Attributes: __class_vars__: The names of classvars defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The signature for instantiating the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a `RootModel`.
__pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.

__pydantic_extra__: An instance attribute with the values of extra fields from validation when
    `model_config['extra'] == 'allow'`.
__pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
__pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
name: str
launch_script: str | None
instance_type: str
ami: str
tags: dict[str, str]
size_gb: int
checkout: bool
ami_user: str
model_config = {}
model_fields = {'name': FieldInfo(annotation=str, required=True), 'launch_script': FieldInfo(annotation=Union[str, NoneType], required=True), 'instance_type': FieldInfo(annotation=str, required=True), 'ami': FieldInfo(annotation=str, required=True), 'tags': FieldInfo(annotation=dict[str, str], required=False, default={}), 'size_gb': FieldInfo(annotation=int, required=True), 'checkout': FieldInfo(annotation=bool, required=False, default=True), 'ami_user': FieldInfo(annotation=str, required=False, default='ubuntu')}
model_computed_fields = {}
def launch_cluster( descs: list[MachineDesc], *, nonce: str | None = None, key_name: str | None = None, security_group_name: str = 'scratch-security-group', instance_profile: str | None = 'admin-instance', extra_tags: dict[str, str] = {}, delete_after: datetime.datetime, git_rev: str = 'HEAD', extra_env: dict[str, str] = {}) -> list[mypy_boto3_ec2.service_resource.Instance]:
307def launch_cluster(
308    descs: list[MachineDesc],
309    *,
310    nonce: str | None = None,
311    key_name: str | None = None,
312    security_group_name: str = DEFAULT_SECURITY_GROUP_NAME,
313    instance_profile: str | None = DEFAULT_INSTANCE_PROFILE_NAME,
314    extra_tags: dict[str, str] = {},
315    delete_after: datetime.datetime,
316    git_rev: str = "HEAD",
317    extra_env: dict[str, str] = {},
318) -> list[Instance]:
319    """Launch a cluster of instances with a given nonce"""
320
321    if not nonce:
322        nonce = util.nonce(8)
323
324    instances = [
325        launch(
326            key_name=key_name,
327            instance_type=d.instance_type,
328            ami=d.ami,
329            ami_user=d.ami_user,
330            tags={**d.tags, **extra_tags},
331            display_name=f"{nonce}-{d.name}",
332            size_gb=d.size_gb,
333            security_group_name=security_group_name,
334            instance_profile=instance_profile,
335            nonce=nonce,
336            delete_after=delete_after,
337        )
338        for d in descs
339    ]
340
341    loop = asyncio.get_event_loop()
342    loop.run_until_complete(
343        asyncio.gather(
344            *(
345                setup(i, git_rev if d.checkout else "HEAD")
346                for (i, d) in zip(instances, descs)
347            )
348        )
349    )
350
351    hosts_str = "".join(
352        f"{i.private_ip_address}\t{d.name}\n" for (i, d) in zip(instances, descs)
353    )
354    for i in instances:
355        mssh(i, "sudo tee -a /etc/hosts", input=hosts_str.encode())
356
357    env = " ".join(f"{k}={shlex.quote(v)}" for k, v in extra_env.items())
358    for i, d in zip(instances, descs):
359        if d.launch_script:
360            mssh(
361                i,
362                f"(cd materialize && {env} nohup bash -c {shlex.quote(d.launch_script)}) &> mzscratch.log &",
363            )
364
365    return instances

Launch a cluster of instances with a given nonce

def whoami() -> str:
368def whoami() -> str:
369    return boto3.client("sts").get_caller_identity()["UserId"].split(":")[1]
def get_instance(name: str) -> mypy_boto3_ec2.service_resource.Instance:
372def get_instance(name: str) -> Instance:
373    """
374    Get an instance by instance id. The special name 'mine' resolves to a
375    unique running owned instance, if there is one; otherwise the name is
376    assumed to be an instance id.
377    :param name: The instance id or the special case 'mine'.
378    :return: The instance to which the name refers.
379    """
380    if name == "mine":
381        filters: list[FilterTypeDef] = [
382            {"Name": "tag:LaunchedBy", "Values": [whoami()]},
383            {"Name": "instance-state-name", "Values": ["pending", "running"]},
384        ]
385        instances = [i for i in boto3.resource("ec2").instances.filter(Filters=filters)]
386        if not instances:
387            raise RuntimeError("can't understand 'mine': no owned instance?")
388        if len(instances) > 1:
389            raise RuntimeError(
390                f"can't understand 'mine': too many owned instances ({', '.join(i.id for i in instances)})"
391            )
392        instance = instances[0]
393        say(f"understanding 'mine' as unique owned instance {instance.id}")
394        return instance
395    return boto3.resource("ec2").Instance(name)

Get an instance by instance id. The special name 'mine' resolves to a unique running owned instance, if there is one; otherwise the name is assumed to be an instance id.

Parameters
  • name: The instance id or the special case 'mine'.
Returns

The instance to which the name refers.

def get_instances_by_tag(k: str, v: str) -> list[mypy_boto3_ec2.type_defs.InstanceTypeDef]:
398def get_instances_by_tag(k: str, v: str) -> list[InstanceTypeDef]:
399    return [
400        i
401        for r in boto3.client("ec2").describe_instances()["Reservations"]
402        for i in r["Instances"]
403        if instance_typedef_tags(i).get(k) == v
404    ]
def get_old_instances() -> list[mypy_boto3_ec2.type_defs.InstanceTypeDef]:
407def get_old_instances() -> list[InstanceTypeDef]:
408    def exists(i: InstanceTypeDef) -> bool:
409        return i["State"]["Name"] != "terminated"
410
411    def is_old(i: InstanceTypeDef) -> bool:
412        delete_after = instance_typedef_tags(i).get("scratch-delete-after")
413        if delete_after is None:
414            return False
415        delete_after = float(delete_after)
416        return datetime.datetime.utcnow().timestamp() > delete_after
417
418    return [
419        i
420        for r in boto3.client("ec2").describe_instances()["Reservations"]
421        for i in r["Instances"]
422        if exists(i) and is_old(i)
423    ]
def mssh( instance: mypy_boto3_ec2.service_resource.Instance, command: str, *, extra_ssh_args: list[str] = [], input: bytes | None = None) -> None:
426def mssh(
427    instance: Instance,
428    command: str,
429    *,
430    extra_ssh_args: list[str] = [],
431    input: bytes | None = None,
432) -> None:
433    """Runs a command over SSH via EC2 Instance Connect."""
434    host = instance_host(instance)
435    if command:
436        print(f"{host}$ {command}", file=sys.stderr)
437        # Quote to work around:
438        # https://github.com/aws/aws-ec2-instance-connect-cli/pull/26
439        command = shlex.quote(command)
440    else:
441        print(f"$ mssh {host}")
442
443    subprocess.run(
444        [
445            *SSH_COMMAND,
446            *extra_ssh_args,
447            host,
448            command,
449        ],
450        check=True,
451        input=input,
452    )

Runs a command over SSH via EC2 Instance Connect.

def msftp(instance: mypy_boto3_ec2.service_resource.Instance) -> None:
455def msftp(
456    instance: Instance,
457) -> None:
458    """Connects over SFTP via EC2 Instance Connect."""
459    host = instance_host(instance)
460    spawn.runv([*SFTP_COMMAND, host])

Connects over SFTP via EC2 Instance Connect.