misc.python.materialize.scratch

Utilities for launching and interacting with scratch EC2 instances.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""Utilities for launching and interacting with scratch EC2 instances."""
 11
 12import asyncio
 13import csv
 14import datetime
 15import os
 16import shlex
 17import subprocess
 18import sys
 19from subprocess import CalledProcessError
 20from typing import NamedTuple, cast
 21
 22import boto3
 23from botocore.exceptions import ClientError
 24from mypy_boto3_ec2.literals import InstanceTypeType
 25from mypy_boto3_ec2.service_resource import Instance
 26from mypy_boto3_ec2.type_defs import (
 27    FilterTypeDef,
 28    InstanceNetworkInterfaceSpecificationTypeDef,
 29    InstanceTypeDef,
 30    RunInstancesRequestServiceResourceCreateInstancesTypeDef,
 31)
 32from prettytable import PrettyTable
 33from pydantic import BaseModel
 34
 35from materialize import MZ_ROOT, git, spawn, ui, util
 36
 37# Sane defaults for internal Materialize use in the scratch account
 38DEFAULT_SECURITY_GROUP_NAME = "scratch-security-group"
 39DEFAULT_INSTANCE_PROFILE_NAME = "admin-instance"
 40
 41SSH_COMMAND = ["mssh", "-o", "StrictHostKeyChecking=off"]
 42SFTP_COMMAND = ["msftp", "-o", "StrictHostKeyChecking=off"]
 43
 44say = ui.speaker("scratch> ")
 45
 46
 47def tags(i: Instance) -> dict[str, str]:
 48    if not i.tags:
 49        return {}
 50    return {t["Key"]: t["Value"] for t in i.tags}
 51
 52
 53def instance_typedef_tags(i: InstanceTypeDef) -> dict[str, str]:
 54    return {t["Key"]: t["Value"] for t in i.get("Tags", [])}
 55
 56
 57def name(tags: dict[str, str]) -> str | None:
 58    return tags.get("Name")
 59
 60
 61def launched_by(tags: dict[str, str]) -> str | None:
 62    return tags.get("LaunchedBy")
 63
 64
 65def ami_user(tags: dict[str, str]) -> str | None:
 66    return tags.get("ami-user", "ubuntu")
 67
 68
 69def delete_after(tags: dict[str, str]) -> datetime.datetime | None:
 70    unix = tags.get("scratch-delete-after")
 71    if not unix:
 72        return None
 73    unix = int(float(unix))
 74    return datetime.datetime.fromtimestamp(unix)
 75
 76
 77def instance_host(instance: Instance, user: str | None = None) -> str:
 78    if user is None:
 79        user = ami_user(tags(instance))
 80    return f"{user}@{instance.id}"
 81
 82
 83def print_instances(ists: list[Instance], format: str) -> None:
 84    field_names = [
 85        "Name",
 86        "Instance ID",
 87        "Public IP Address",
 88        "Private IP Address",
 89        "Launched By",
 90        "Delete After",
 91        "State",
 92    ]
 93    rows = [
 94        [
 95            name(tags),
 96            i.instance_id,
 97            i.public_ip_address,
 98            i.private_ip_address,
 99            launched_by(tags),
100            delete_after(tags),
101            i.state["Name"],
102        ]
103        for (i, tags) in [(i, tags(i)) for i in ists]
104    ]
105    if format == "table":
106        pt = PrettyTable()
107        pt.field_names = field_names
108        pt.add_rows(rows)
109        print(pt)
110    elif format == "csv":
111        w = csv.writer(sys.stdout)
112        w.writerow(field_names)
113        w.writerows(rows)
114    else:
115        raise RuntimeError("Unknown format passed to print_instances")
116
117
118def launch(
119    *,
120    key_name: str | None,
121    instance_type: str,
122    ami: str,
123    ami_user: str,
124    tags: dict[str, str],
125    display_name: str | None = None,
126    size_gb: int,
127    security_group_name: str,
128    instance_profile: str | None,
129    nonce: str,
130    delete_after: datetime.datetime,
131) -> Instance:
132    """Launch and configure an ec2 instance with the given properties."""
133
134    if display_name:
135        tags["Name"] = display_name
136    tags["scratch-delete-after"] = str(delete_after.timestamp())
137    tags["nonce"] = nonce
138    tags["git_ref"] = git.describe()
139    tags["ami-user"] = ami_user
140
141    ec2 = boto3.client("ec2")
142    groups = ec2.describe_security_groups()
143    security_group_id = None
144    for group in groups["SecurityGroups"]:
145        if group["GroupName"] == security_group_name:
146            security_group_id = group["GroupId"]
147            break
148
149    if security_group_id is None:
150        vpcs = ec2.describe_vpcs()
151        vpc_id = None
152        for vpc in vpcs["Vpcs"]:
153            if vpc["IsDefault"] == True:
154                vpc_id = vpc["VpcId"]
155                break
156        if vpc_id is None:
157            default_vpc = ec2.create_default_vpc()
158            vpc_id = default_vpc["Vpc"]["VpcId"]
159        securitygroup = ec2.create_security_group(
160            GroupName=security_group_name,
161            Description="Allows all.",
162            VpcId=vpc_id,
163        )
164        security_group_id = securitygroup["GroupId"]
165        ec2.authorize_security_group_ingress(
166            GroupId=security_group_id,
167            CidrIp="0.0.0.0/0",
168            IpProtocol="tcp",
169            FromPort=22,
170            ToPort=22,
171        )
172
173    network_interface: InstanceNetworkInterfaceSpecificationTypeDef = {
174        "AssociatePublicIpAddress": True,
175        "DeviceIndex": 0,
176        "Groups": [security_group_id],
177    }
178
179    say(f"launching instance {display_name or '(unnamed)'}")
180    with open(MZ_ROOT / "misc" / "scratch" / "provision.bash") as f:
181        provisioning_script = f.read()
182    kwargs: RunInstancesRequestServiceResourceCreateInstancesTypeDef = {
183        "MinCount": 1,
184        "MaxCount": 1,
185        "ImageId": ami,
186        "InstanceType": cast(InstanceTypeType, instance_type),
187        "UserData": provisioning_script,
188        "TagSpecifications": [
189            {
190                "ResourceType": "instance",
191                "Tags": [{"Key": k, "Value": v} for (k, v) in tags.items()],
192            }
193        ],
194        "NetworkInterfaces": [network_interface],
195        "BlockDeviceMappings": [
196            {
197                "DeviceName": "/dev/sda1",
198                "Ebs": {
199                    "VolumeSize": size_gb,
200                    "VolumeType": "gp3",
201                },
202            }
203        ],
204        "MetadataOptions": {
205            # Allow Docker containers to access IMDSv2.
206            "HttpPutResponseHopLimit": 2,
207        },
208    }
209    if key_name:
210        kwargs["KeyName"] = key_name
211    if instance_profile:
212        kwargs["IamInstanceProfile"] = {"Name": instance_profile}
213    i = boto3.resource("ec2").create_instances(**kwargs)[0]
214
215    return i
216
217
218class CommandResult(NamedTuple):
219    status: str
220    stdout: str
221    stderr: str
222
223
224async def setup(
225    i: Instance,
226    git_rev: str,
227) -> None:
228    def is_ready(i: Instance) -> bool:
229        return bool(
230            i.public_ip_address and i.state and i.state.get("Name") == "running"
231        )
232
233    done = False
234    async for remaining in ui.async_timeout_loop(60, 5):
235        say(f"Waiting for instance to become ready: {remaining:0.0f}s remaining")
236        try:
237            i.reload()
238            if is_ready(i):
239                done = True
240                break
241        except ClientError:
242            pass
243    if not done:
244        raise RuntimeError(
245            f"Instance {i} did not become ready in a reasonable amount of time"
246        )
247
248    done = False
249    async for remaining in ui.async_timeout_loop(300, 5):
250        say(f"Checking whether setup has completed: {remaining:0.0f}s remaining")
251        try:
252            mssh(i, "[[ -f /opt/provision/done ]]")
253            done = True
254            break
255        except CalledProcessError:
256            continue
257    if not done:
258        raise RuntimeError(
259            "Instance did not finish setup in a reasonable amount of time"
260        )
261
262    mkrepo(i, git_rev)
263
264
265def mkrepo(i: Instance, rev: str, init: bool = True, force: bool = False) -> None:
266    if init:
267        mssh(
268            i,
269            "git clone https://github.com/MaterializeInc/materialize.git --recurse-submodules",
270        )
271
272    rev = git.rev_parse(rev)
273
274    cmd: list[str] = [
275        "git",
276        "push",
277        "--no-verify",
278        f"{instance_host(i)}:materialize/.git",
279        # Explicit refspec is required if the host repository is in detached
280        # HEAD mode.
281        f"{rev}:refs/heads/scratch",
282        "--no-recurse-submodules",
283    ]
284    if force:
285        cmd.append("--force")
286
287    spawn.runv(
288        cmd,
289        cwd=MZ_ROOT,
290        env=dict(os.environ, GIT_SSH_COMMAND=" ".join(SSH_COMMAND)),
291    )
292    mssh(
293        i,
294        f"cd materialize && git config core.bare false && git checkout {rev} && git submodule sync --recursive && git submodule update --recursive",
295    )
296
297
298class MachineDesc(BaseModel):
299    name: str
300    launch_script: str | None
301    instance_type: str
302    ami: str
303    tags: dict[str, str] = {}
304    size_gb: int
305    checkout: bool = True
306    ami_user: str = "ubuntu"
307
308
309def launch_cluster(
310    descs: list[MachineDesc],
311    *,
312    nonce: str | None = None,
313    key_name: str | None = None,
314    security_group_name: str = DEFAULT_SECURITY_GROUP_NAME,
315    instance_profile: str | None = DEFAULT_INSTANCE_PROFILE_NAME,
316    extra_tags: dict[str, str] = {},
317    delete_after: datetime.datetime,
318    git_rev: str = "HEAD",
319    extra_env: dict[str, str] = {},
320) -> list[Instance]:
321    """Launch a cluster of instances with a given nonce"""
322
323    if not nonce:
324        nonce = util.nonce(8)
325
326    instances = [
327        launch(
328            key_name=key_name,
329            instance_type=d.instance_type,
330            ami=d.ami,
331            ami_user=d.ami_user,
332            tags={**d.tags, **extra_tags},
333            display_name=f"{nonce}-{d.name}",
334            size_gb=d.size_gb,
335            security_group_name=security_group_name,
336            instance_profile=instance_profile,
337            nonce=nonce,
338            delete_after=delete_after,
339        )
340        for d in descs
341    ]
342
343    loop = asyncio.get_event_loop()
344    loop.run_until_complete(
345        asyncio.gather(
346            *(
347                setup(i, git_rev if d.checkout else "HEAD")
348                for (i, d) in zip(instances, descs)
349            )
350        )
351    )
352
353    hosts_str = "".join(
354        f"{i.private_ip_address}\t{d.name}\n" for (i, d) in zip(instances, descs)
355    )
356    for i in instances:
357        mssh(i, "sudo tee -a /etc/hosts", input=hosts_str.encode())
358
359    env = " ".join(f"{k}={shlex.quote(v)}" for k, v in extra_env.items())
360    for i, d in zip(instances, descs):
361        if d.launch_script:
362            mssh(
363                i,
364                f"(cd materialize && {env} nohup bash -c {shlex.quote(d.launch_script)}) &> mzscratch.log &",
365            )
366
367    return instances
368
369
370def whoami() -> str:
371    return boto3.client("sts").get_caller_identity()["UserId"].split(":")[1]
372
373
374def get_instance(name: str) -> Instance:
375    """
376    Get an instance by instance id. The special name 'mine' resolves to a
377    unique running owned instance, if there is one; otherwise the name is
378    assumed to be an instance id.
379    :param name: The instance id or the special case 'mine'.
380    :return: The instance to which the name refers.
381    """
382    if name == "mine":
383        filters: list[FilterTypeDef] = [
384            {"Name": "tag:LaunchedBy", "Values": [whoami()]},
385            {"Name": "instance-state-name", "Values": ["pending", "running"]},
386        ]
387        instances = [i for i in boto3.resource("ec2").instances.filter(Filters=filters)]
388        if not instances:
389            raise RuntimeError("can't understand 'mine': no owned instance?")
390        if len(instances) > 1:
391            raise RuntimeError(
392                f"can't understand 'mine': too many owned instances ({', '.join(i.id for i in instances)})"
393            )
394        instance = instances[0]
395        say(f"understanding 'mine' as unique owned instance {instance.id}")
396        return instance
397    return boto3.resource("ec2").Instance(name)
398
399
400def get_instances_by_tag(k: str, v: str) -> list[InstanceTypeDef]:
401    return [
402        i
403        for r in boto3.client("ec2").describe_instances()["Reservations"]
404        for i in r["Instances"]
405        if instance_typedef_tags(i).get(k) == v
406    ]
407
408
409def get_old_instances() -> list[InstanceTypeDef]:
410    def exists(i: InstanceTypeDef) -> bool:
411        return i["State"]["Name"] != "terminated"
412
413    def is_old(i: InstanceTypeDef) -> bool:
414        delete_after = instance_typedef_tags(i).get("scratch-delete-after")
415        if delete_after is None:
416            return False
417        delete_after = float(delete_after)
418        return datetime.datetime.utcnow().timestamp() > delete_after
419
420    return [
421        i
422        for r in boto3.client("ec2").describe_instances()["Reservations"]
423        for i in r["Instances"]
424        if exists(i) and is_old(i)
425    ]
426
427
428def mssh(
429    instance: Instance,
430    command: str,
431    *,
432    extra_ssh_args: list[str] = [],
433    input: bytes | None = None,
434) -> None:
435    """Runs a command over SSH via EC2 Instance Connect."""
436    host = instance_host(instance)
437    if command:
438        print(f"{host}$ {command}", file=sys.stderr)
439        # Quote to work around:
440        # https://github.com/aws/aws-ec2-instance-connect-cli/pull/26
441        command = shlex.quote(command)
442    else:
443        print(f"$ mssh {host}")
444
445    subprocess.run(
446        [
447            *SSH_COMMAND,
448            *extra_ssh_args,
449            host,
450            command,
451        ],
452        check=True,
453        input=input,
454    )
455
456
457def msftp(
458    instance: Instance,
459) -> None:
460    """Connects over SFTP via EC2 Instance Connect."""
461    host = instance_host(instance)
462    spawn.runv([*SFTP_COMMAND, host])
DEFAULT_SECURITY_GROUP_NAME = 'scratch-security-group'
DEFAULT_INSTANCE_PROFILE_NAME = 'admin-instance'
SSH_COMMAND = ['mssh', '-o', 'StrictHostKeyChecking=off']
SFTP_COMMAND = ['msftp', '-o', 'StrictHostKeyChecking=off']
def say(msg: str) -> None:
55    def say(msg: str) -> None:
56        if not Verbosity.quiet:
57            print(f"{prefix}{msg}", file=sys.stderr)
def tags(i: mypy_boto3_ec2.service_resource.Instance) -> dict[str, str]:
48def tags(i: Instance) -> dict[str, str]:
49    if not i.tags:
50        return {}
51    return {t["Key"]: t["Value"] for t in i.tags}
def instance_typedef_tags(i: mypy_boto3_ec2.type_defs.InstanceTypeDef) -> dict[str, str]:
54def instance_typedef_tags(i: InstanceTypeDef) -> dict[str, str]:
55    return {t["Key"]: t["Value"] for t in i.get("Tags", [])}
def name(tags: dict[str, str]) -> str | None:
58def name(tags: dict[str, str]) -> str | None:
59    return tags.get("Name")
def launched_by(tags: dict[str, str]) -> str | None:
62def launched_by(tags: dict[str, str]) -> str | None:
63    return tags.get("LaunchedBy")
def ami_user(tags: dict[str, str]) -> str | None:
66def ami_user(tags: dict[str, str]) -> str | None:
67    return tags.get("ami-user", "ubuntu")
def delete_after(tags: dict[str, str]) -> datetime.datetime | None:
70def delete_after(tags: dict[str, str]) -> datetime.datetime | None:
71    unix = tags.get("scratch-delete-after")
72    if not unix:
73        return None
74    unix = int(float(unix))
75    return datetime.datetime.fromtimestamp(unix)
def instance_host( instance: mypy_boto3_ec2.service_resource.Instance, user: str | None = None) -> str:
78def instance_host(instance: Instance, user: str | None = None) -> str:
79    if user is None:
80        user = ami_user(tags(instance))
81    return f"{user}@{instance.id}"
def launch( *, key_name: str | None, instance_type: str, ami: str, ami_user: str, tags: dict[str, str], display_name: str | None = None, size_gb: int, security_group_name: str, instance_profile: str | None, nonce: str, delete_after: datetime.datetime) -> mypy_boto3_ec2.service_resource.Instance:
119def launch(
120    *,
121    key_name: str | None,
122    instance_type: str,
123    ami: str,
124    ami_user: str,
125    tags: dict[str, str],
126    display_name: str | None = None,
127    size_gb: int,
128    security_group_name: str,
129    instance_profile: str | None,
130    nonce: str,
131    delete_after: datetime.datetime,
132) -> Instance:
133    """Launch and configure an ec2 instance with the given properties."""
134
135    if display_name:
136        tags["Name"] = display_name
137    tags["scratch-delete-after"] = str(delete_after.timestamp())
138    tags["nonce"] = nonce
139    tags["git_ref"] = git.describe()
140    tags["ami-user"] = ami_user
141
142    ec2 = boto3.client("ec2")
143    groups = ec2.describe_security_groups()
144    security_group_id = None
145    for group in groups["SecurityGroups"]:
146        if group["GroupName"] == security_group_name:
147            security_group_id = group["GroupId"]
148            break
149
150    if security_group_id is None:
151        vpcs = ec2.describe_vpcs()
152        vpc_id = None
153        for vpc in vpcs["Vpcs"]:
154            if vpc["IsDefault"] == True:
155                vpc_id = vpc["VpcId"]
156                break
157        if vpc_id is None:
158            default_vpc = ec2.create_default_vpc()
159            vpc_id = default_vpc["Vpc"]["VpcId"]
160        securitygroup = ec2.create_security_group(
161            GroupName=security_group_name,
162            Description="Allows all.",
163            VpcId=vpc_id,
164        )
165        security_group_id = securitygroup["GroupId"]
166        ec2.authorize_security_group_ingress(
167            GroupId=security_group_id,
168            CidrIp="0.0.0.0/0",
169            IpProtocol="tcp",
170            FromPort=22,
171            ToPort=22,
172        )
173
174    network_interface: InstanceNetworkInterfaceSpecificationTypeDef = {
175        "AssociatePublicIpAddress": True,
176        "DeviceIndex": 0,
177        "Groups": [security_group_id],
178    }
179
180    say(f"launching instance {display_name or '(unnamed)'}")
181    with open(MZ_ROOT / "misc" / "scratch" / "provision.bash") as f:
182        provisioning_script = f.read()
183    kwargs: RunInstancesRequestServiceResourceCreateInstancesTypeDef = {
184        "MinCount": 1,
185        "MaxCount": 1,
186        "ImageId": ami,
187        "InstanceType": cast(InstanceTypeType, instance_type),
188        "UserData": provisioning_script,
189        "TagSpecifications": [
190            {
191                "ResourceType": "instance",
192                "Tags": [{"Key": k, "Value": v} for (k, v) in tags.items()],
193            }
194        ],
195        "NetworkInterfaces": [network_interface],
196        "BlockDeviceMappings": [
197            {
198                "DeviceName": "/dev/sda1",
199                "Ebs": {
200                    "VolumeSize": size_gb,
201                    "VolumeType": "gp3",
202                },
203            }
204        ],
205        "MetadataOptions": {
206            # Allow Docker containers to access IMDSv2.
207            "HttpPutResponseHopLimit": 2,
208        },
209    }
210    if key_name:
211        kwargs["KeyName"] = key_name
212    if instance_profile:
213        kwargs["IamInstanceProfile"] = {"Name": instance_profile}
214    i = boto3.resource("ec2").create_instances(**kwargs)[0]
215
216    return i

Launch and configure an ec2 instance with the given properties.

class CommandResult(typing.NamedTuple):
219class CommandResult(NamedTuple):
220    status: str
221    stdout: str
222    stderr: str

CommandResult(status, stdout, stderr)

CommandResult(status: str, stdout: str, stderr: str)

Create new instance of CommandResult(status, stdout, stderr)

status: str

Alias for field number 0

stdout: str

Alias for field number 1

stderr: str

Alias for field number 2

async def setup(i: mypy_boto3_ec2.service_resource.Instance, git_rev: str) -> None:
225async def setup(
226    i: Instance,
227    git_rev: str,
228) -> None:
229    def is_ready(i: Instance) -> bool:
230        return bool(
231            i.public_ip_address and i.state and i.state.get("Name") == "running"
232        )
233
234    done = False
235    async for remaining in ui.async_timeout_loop(60, 5):
236        say(f"Waiting for instance to become ready: {remaining:0.0f}s remaining")
237        try:
238            i.reload()
239            if is_ready(i):
240                done = True
241                break
242        except ClientError:
243            pass
244    if not done:
245        raise RuntimeError(
246            f"Instance {i} did not become ready in a reasonable amount of time"
247        )
248
249    done = False
250    async for remaining in ui.async_timeout_loop(300, 5):
251        say(f"Checking whether setup has completed: {remaining:0.0f}s remaining")
252        try:
253            mssh(i, "[[ -f /opt/provision/done ]]")
254            done = True
255            break
256        except CalledProcessError:
257            continue
258    if not done:
259        raise RuntimeError(
260            "Instance did not finish setup in a reasonable amount of time"
261        )
262
263    mkrepo(i, git_rev)
def mkrepo( i: mypy_boto3_ec2.service_resource.Instance, rev: str, init: bool = True, force: bool = False) -> None:
266def mkrepo(i: Instance, rev: str, init: bool = True, force: bool = False) -> None:
267    if init:
268        mssh(
269            i,
270            "git clone https://github.com/MaterializeInc/materialize.git --recurse-submodules",
271        )
272
273    rev = git.rev_parse(rev)
274
275    cmd: list[str] = [
276        "git",
277        "push",
278        "--no-verify",
279        f"{instance_host(i)}:materialize/.git",
280        # Explicit refspec is required if the host repository is in detached
281        # HEAD mode.
282        f"{rev}:refs/heads/scratch",
283        "--no-recurse-submodules",
284    ]
285    if force:
286        cmd.append("--force")
287
288    spawn.runv(
289        cmd,
290        cwd=MZ_ROOT,
291        env=dict(os.environ, GIT_SSH_COMMAND=" ".join(SSH_COMMAND)),
292    )
293    mssh(
294        i,
295        f"cd materialize && git config core.bare false && git checkout {rev} && git submodule sync --recursive && git submodule update --recursive",
296    )
class MachineDesc(pydantic.main.BaseModel):
299class MachineDesc(BaseModel):
300    name: str
301    launch_script: str | None
302    instance_type: str
303    ami: str
304    tags: dict[str, str] = {}
305    size_gb: int
306    checkout: bool = True
307    ami_user: str = "ubuntu"

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes: __class_vars__: The names of the class variables defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.

__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.

__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
    is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
name: str
launch_script: str | None
instance_type: str
ami: str
tags: dict[str, str]
size_gb: int
checkout: bool
ami_user: str
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def launch_cluster( descs: list[MachineDesc], *, nonce: str | None = None, key_name: str | None = None, security_group_name: str = 'scratch-security-group', instance_profile: str | None = 'admin-instance', extra_tags: dict[str, str] = {}, delete_after: datetime.datetime, git_rev: str = 'HEAD', extra_env: dict[str, str] = {}) -> list[mypy_boto3_ec2.service_resource.Instance]:
310def launch_cluster(
311    descs: list[MachineDesc],
312    *,
313    nonce: str | None = None,
314    key_name: str | None = None,
315    security_group_name: str = DEFAULT_SECURITY_GROUP_NAME,
316    instance_profile: str | None = DEFAULT_INSTANCE_PROFILE_NAME,
317    extra_tags: dict[str, str] = {},
318    delete_after: datetime.datetime,
319    git_rev: str = "HEAD",
320    extra_env: dict[str, str] = {},
321) -> list[Instance]:
322    """Launch a cluster of instances with a given nonce"""
323
324    if not nonce:
325        nonce = util.nonce(8)
326
327    instances = [
328        launch(
329            key_name=key_name,
330            instance_type=d.instance_type,
331            ami=d.ami,
332            ami_user=d.ami_user,
333            tags={**d.tags, **extra_tags},
334            display_name=f"{nonce}-{d.name}",
335            size_gb=d.size_gb,
336            security_group_name=security_group_name,
337            instance_profile=instance_profile,
338            nonce=nonce,
339            delete_after=delete_after,
340        )
341        for d in descs
342    ]
343
344    loop = asyncio.get_event_loop()
345    loop.run_until_complete(
346        asyncio.gather(
347            *(
348                setup(i, git_rev if d.checkout else "HEAD")
349                for (i, d) in zip(instances, descs)
350            )
351        )
352    )
353
354    hosts_str = "".join(
355        f"{i.private_ip_address}\t{d.name}\n" for (i, d) in zip(instances, descs)
356    )
357    for i in instances:
358        mssh(i, "sudo tee -a /etc/hosts", input=hosts_str.encode())
359
360    env = " ".join(f"{k}={shlex.quote(v)}" for k, v in extra_env.items())
361    for i, d in zip(instances, descs):
362        if d.launch_script:
363            mssh(
364                i,
365                f"(cd materialize && {env} nohup bash -c {shlex.quote(d.launch_script)}) &> mzscratch.log &",
366            )
367
368    return instances

Launch a cluster of instances with a given nonce

def whoami() -> str:
371def whoami() -> str:
372    return boto3.client("sts").get_caller_identity()["UserId"].split(":")[1]
def get_instance(name: str) -> mypy_boto3_ec2.service_resource.Instance:
375def get_instance(name: str) -> Instance:
376    """
377    Get an instance by instance id. The special name 'mine' resolves to a
378    unique running owned instance, if there is one; otherwise the name is
379    assumed to be an instance id.
380    :param name: The instance id or the special case 'mine'.
381    :return: The instance to which the name refers.
382    """
383    if name == "mine":
384        filters: list[FilterTypeDef] = [
385            {"Name": "tag:LaunchedBy", "Values": [whoami()]},
386            {"Name": "instance-state-name", "Values": ["pending", "running"]},
387        ]
388        instances = [i for i in boto3.resource("ec2").instances.filter(Filters=filters)]
389        if not instances:
390            raise RuntimeError("can't understand 'mine': no owned instance?")
391        if len(instances) > 1:
392            raise RuntimeError(
393                f"can't understand 'mine': too many owned instances ({', '.join(i.id for i in instances)})"
394            )
395        instance = instances[0]
396        say(f"understanding 'mine' as unique owned instance {instance.id}")
397        return instance
398    return boto3.resource("ec2").Instance(name)

Get an instance by instance id. The special name 'mine' resolves to a unique running owned instance, if there is one; otherwise the name is assumed to be an instance id.

Parameters
  • name: The instance id or the special case 'mine'.
Returns

The instance to which the name refers.

def get_instances_by_tag(k: str, v: str) -> list[mypy_boto3_ec2.type_defs.InstanceTypeDef]:
401def get_instances_by_tag(k: str, v: str) -> list[InstanceTypeDef]:
402    return [
403        i
404        for r in boto3.client("ec2").describe_instances()["Reservations"]
405        for i in r["Instances"]
406        if instance_typedef_tags(i).get(k) == v
407    ]
def get_old_instances() -> list[mypy_boto3_ec2.type_defs.InstanceTypeDef]:
410def get_old_instances() -> list[InstanceTypeDef]:
411    def exists(i: InstanceTypeDef) -> bool:
412        return i["State"]["Name"] != "terminated"
413
414    def is_old(i: InstanceTypeDef) -> bool:
415        delete_after = instance_typedef_tags(i).get("scratch-delete-after")
416        if delete_after is None:
417            return False
418        delete_after = float(delete_after)
419        return datetime.datetime.utcnow().timestamp() > delete_after
420
421    return [
422        i
423        for r in boto3.client("ec2").describe_instances()["Reservations"]
424        for i in r["Instances"]
425        if exists(i) and is_old(i)
426    ]
def mssh( instance: mypy_boto3_ec2.service_resource.Instance, command: str, *, extra_ssh_args: list[str] = [], input: bytes | None = None) -> None:
429def mssh(
430    instance: Instance,
431    command: str,
432    *,
433    extra_ssh_args: list[str] = [],
434    input: bytes | None = None,
435) -> None:
436    """Runs a command over SSH via EC2 Instance Connect."""
437    host = instance_host(instance)
438    if command:
439        print(f"{host}$ {command}", file=sys.stderr)
440        # Quote to work around:
441        # https://github.com/aws/aws-ec2-instance-connect-cli/pull/26
442        command = shlex.quote(command)
443    else:
444        print(f"$ mssh {host}")
445
446    subprocess.run(
447        [
448            *SSH_COMMAND,
449            *extra_ssh_args,
450            host,
451            command,
452        ],
453        check=True,
454        input=input,
455    )

Runs a command over SSH via EC2 Instance Connect.

def msftp(instance: mypy_boto3_ec2.service_resource.Instance) -> None:
458def msftp(
459    instance: Instance,
460) -> None:
461    """Connects over SFTP via EC2 Instance Connect."""
462    host = instance_host(instance)
463    spawn.runv([*SFTP_COMMAND, host])

Connects over SFTP via EC2 Instance Connect.