misc.python.materialize.scratch
Utilities for launching and interacting with scratch EC2 instances.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""Utilities for launching and interacting with scratch EC2 instances.""" 11 12import asyncio 13import csv 14import datetime 15import os 16import shlex 17import subprocess 18import sys 19from subprocess import CalledProcessError 20from typing import NamedTuple, cast 21 22import boto3 23from botocore.exceptions import ClientError 24from mypy_boto3_ec2.literals import InstanceTypeType 25from mypy_boto3_ec2.service_resource import Instance 26from mypy_boto3_ec2.type_defs import ( 27 FilterTypeDef, 28 InstanceNetworkInterfaceSpecificationTypeDef, 29 InstanceTypeDef, 30 RunInstancesRequestServiceResourceCreateInstancesTypeDef, 31) 32from prettytable import PrettyTable 33from pydantic import BaseModel 34 35from materialize import MZ_ROOT, git, spawn, ui, util 36 37# Sane defaults for internal Materialize use in the scratch account 38DEFAULT_SECURITY_GROUP_NAME = "scratch-security-group" 39DEFAULT_INSTANCE_PROFILE_NAME = "admin-instance" 40 41SSH_COMMAND = ["mssh", "-o", "StrictHostKeyChecking=off"] 42SFTP_COMMAND = ["msftp", "-o", "StrictHostKeyChecking=off"] 43 44say = ui.speaker("scratch> ") 45 46 47def tags(i: Instance) -> dict[str, str]: 48 if not i.tags: 49 return {} 50 return {t["Key"]: t["Value"] for t in i.tags} 51 52 53def instance_typedef_tags(i: InstanceTypeDef) -> dict[str, str]: 54 return {t["Key"]: t["Value"] for t in i.get("Tags", [])} 55 56 57def name(tags: dict[str, str]) -> str | None: 58 return tags.get("Name") 59 60 61def launched_by(tags: dict[str, str]) -> str | None: 62 return tags.get("LaunchedBy") 63 64 65def ami_user(tags: dict[str, str]) -> str | None: 66 return tags.get("ami-user", "ubuntu") 67 68 69def delete_after(tags: dict[str, str]) -> datetime.datetime | None: 70 unix = tags.get("scratch-delete-after") 71 if not unix: 72 return None 73 unix = int(float(unix)) 74 return datetime.datetime.fromtimestamp(unix) 75 76 77def instance_host(instance: Instance, user: str | None = None) -> str: 78 if user is None: 79 user = ami_user(tags(instance)) 80 return f"{user}@{instance.id}" 81 82 83def print_instances(ists: list[Instance], format: str) -> None: 84 field_names = [ 85 "Name", 86 "Instance ID", 87 "Public IP Address", 88 "Private IP Address", 89 "Launched By", 90 "Delete After", 91 "State", 92 ] 93 rows = [ 94 [ 95 name(tags), 96 i.instance_id, 97 i.public_ip_address, 98 i.private_ip_address, 99 launched_by(tags), 100 delete_after(tags), 101 i.state["Name"], 102 ] 103 for (i, tags) in [(i, tags(i)) for i in ists] 104 ] 105 if format == "table": 106 pt = PrettyTable() 107 pt.field_names = field_names 108 pt.add_rows(rows) 109 print(pt) 110 elif format == "csv": 111 w = csv.writer(sys.stdout) 112 w.writerow(field_names) 113 w.writerows(rows) 114 else: 115 raise RuntimeError("Unknown format passed to print_instances") 116 117 118def launch( 119 *, 120 key_name: str | None, 121 instance_type: str, 122 ami: str, 123 ami_user: str, 124 tags: dict[str, str], 125 display_name: str | None = None, 126 size_gb: int, 127 security_group_name: str, 128 instance_profile: str | None, 129 nonce: str, 130 delete_after: datetime.datetime, 131) -> Instance: 132 """Launch and configure an ec2 instance with the given properties.""" 133 134 if display_name: 135 tags["Name"] = display_name 136 tags["scratch-delete-after"] = str(delete_after.timestamp()) 137 tags["nonce"] = nonce 138 tags["git_ref"] = git.describe() 139 tags["ami-user"] = ami_user 140 141 ec2 = boto3.client("ec2") 142 groups = ec2.describe_security_groups() 143 security_group_id = None 144 for group in groups["SecurityGroups"]: 145 if group["GroupName"] == security_group_name: 146 security_group_id = group["GroupId"] 147 break 148 149 if security_group_id is None: 150 vpcs = ec2.describe_vpcs() 151 vpc_id = None 152 for vpc in vpcs["Vpcs"]: 153 if vpc["IsDefault"] == True: 154 vpc_id = vpc["VpcId"] 155 break 156 if vpc_id is None: 157 default_vpc = ec2.create_default_vpc() 158 vpc_id = default_vpc["Vpc"]["VpcId"] 159 securitygroup = ec2.create_security_group( 160 GroupName=security_group_name, 161 Description="Allows all.", 162 VpcId=vpc_id, 163 ) 164 security_group_id = securitygroup["GroupId"] 165 ec2.authorize_security_group_ingress( 166 GroupId=security_group_id, 167 CidrIp="0.0.0.0/0", 168 IpProtocol="tcp", 169 FromPort=22, 170 ToPort=22, 171 ) 172 173 network_interface: InstanceNetworkInterfaceSpecificationTypeDef = { 174 "AssociatePublicIpAddress": True, 175 "DeviceIndex": 0, 176 "Groups": [security_group_id], 177 } 178 179 say(f"launching instance {display_name or '(unnamed)'}") 180 with open(MZ_ROOT / "misc" / "scratch" / "provision.bash") as f: 181 provisioning_script = f.read() 182 kwargs: RunInstancesRequestServiceResourceCreateInstancesTypeDef = { 183 "MinCount": 1, 184 "MaxCount": 1, 185 "ImageId": ami, 186 "InstanceType": cast(InstanceTypeType, instance_type), 187 "UserData": provisioning_script, 188 "TagSpecifications": [ 189 { 190 "ResourceType": "instance", 191 "Tags": [{"Key": k, "Value": v} for (k, v) in tags.items()], 192 } 193 ], 194 "NetworkInterfaces": [network_interface], 195 "BlockDeviceMappings": [ 196 { 197 "DeviceName": "/dev/sda1", 198 "Ebs": { 199 "VolumeSize": size_gb, 200 "VolumeType": "gp3", 201 }, 202 } 203 ], 204 "MetadataOptions": { 205 # Allow Docker containers to access IMDSv2. 206 "HttpPutResponseHopLimit": 2, 207 }, 208 } 209 if key_name: 210 kwargs["KeyName"] = key_name 211 if instance_profile: 212 kwargs["IamInstanceProfile"] = {"Name": instance_profile} 213 i = boto3.resource("ec2").create_instances(**kwargs)[0] 214 215 return i 216 217 218class CommandResult(NamedTuple): 219 status: str 220 stdout: str 221 stderr: str 222 223 224async def setup( 225 i: Instance, 226 git_rev: str, 227) -> None: 228 def is_ready(i: Instance) -> bool: 229 return bool( 230 i.public_ip_address and i.state and i.state.get("Name") == "running" 231 ) 232 233 done = False 234 async for remaining in ui.async_timeout_loop(60, 5): 235 say(f"Waiting for instance to become ready: {remaining:0.0f}s remaining") 236 try: 237 i.reload() 238 if is_ready(i): 239 done = True 240 break 241 except ClientError: 242 pass 243 if not done: 244 raise RuntimeError( 245 f"Instance {i} did not become ready in a reasonable amount of time" 246 ) 247 248 done = False 249 async for remaining in ui.async_timeout_loop(300, 5): 250 say(f"Checking whether setup has completed: {remaining:0.0f}s remaining") 251 try: 252 mssh(i, "[[ -f /opt/provision/done ]]") 253 done = True 254 break 255 except CalledProcessError: 256 continue 257 if not done: 258 raise RuntimeError( 259 "Instance did not finish setup in a reasonable amount of time" 260 ) 261 262 mkrepo(i, git_rev) 263 264 265def mkrepo(i: Instance, rev: str, init: bool = True, force: bool = False) -> None: 266 if init: 267 mssh( 268 i, 269 "git clone https://github.com/MaterializeInc/materialize.git --recurse-submodules", 270 ) 271 272 rev = git.rev_parse(rev) 273 274 cmd: list[str] = [ 275 "git", 276 "push", 277 "--no-verify", 278 f"{instance_host(i)}:materialize/.git", 279 # Explicit refspec is required if the host repository is in detached 280 # HEAD mode. 281 f"{rev}:refs/heads/scratch", 282 "--no-recurse-submodules", 283 ] 284 if force: 285 cmd.append("--force") 286 287 spawn.runv( 288 cmd, 289 cwd=MZ_ROOT, 290 env=dict(os.environ, GIT_SSH_COMMAND=" ".join(SSH_COMMAND)), 291 ) 292 mssh( 293 i, 294 f"cd materialize && git config core.bare false && git checkout {rev} && git submodule sync --recursive && git submodule update --recursive", 295 ) 296 297 298class MachineDesc(BaseModel): 299 name: str 300 launch_script: str | None 301 instance_type: str 302 ami: str 303 tags: dict[str, str] = {} 304 size_gb: int 305 checkout: bool = True 306 ami_user: str = "ubuntu" 307 308 309def launch_cluster( 310 descs: list[MachineDesc], 311 *, 312 nonce: str | None = None, 313 key_name: str | None = None, 314 security_group_name: str = DEFAULT_SECURITY_GROUP_NAME, 315 instance_profile: str | None = DEFAULT_INSTANCE_PROFILE_NAME, 316 extra_tags: dict[str, str] = {}, 317 delete_after: datetime.datetime, 318 git_rev: str = "HEAD", 319 extra_env: dict[str, str] = {}, 320) -> list[Instance]: 321 """Launch a cluster of instances with a given nonce""" 322 323 if not nonce: 324 nonce = util.nonce(8) 325 326 instances = [ 327 launch( 328 key_name=key_name, 329 instance_type=d.instance_type, 330 ami=d.ami, 331 ami_user=d.ami_user, 332 tags={**d.tags, **extra_tags}, 333 display_name=f"{nonce}-{d.name}", 334 size_gb=d.size_gb, 335 security_group_name=security_group_name, 336 instance_profile=instance_profile, 337 nonce=nonce, 338 delete_after=delete_after, 339 ) 340 for d in descs 341 ] 342 343 loop = asyncio.get_event_loop() 344 loop.run_until_complete( 345 asyncio.gather( 346 *( 347 setup(i, git_rev if d.checkout else "HEAD") 348 for (i, d) in zip(instances, descs) 349 ) 350 ) 351 ) 352 353 hosts_str = "".join( 354 f"{i.private_ip_address}\t{d.name}\n" for (i, d) in zip(instances, descs) 355 ) 356 for i in instances: 357 mssh(i, "sudo tee -a /etc/hosts", input=hosts_str.encode()) 358 359 env = " ".join(f"{k}={shlex.quote(v)}" for k, v in extra_env.items()) 360 for i, d in zip(instances, descs): 361 if d.launch_script: 362 mssh( 363 i, 364 f"(cd materialize && {env} nohup bash -c {shlex.quote(d.launch_script)}) &> mzscratch.log &", 365 ) 366 367 return instances 368 369 370def whoami() -> str: 371 return boto3.client("sts").get_caller_identity()["UserId"].split(":")[1] 372 373 374def get_instance(name: str) -> Instance: 375 """ 376 Get an instance by instance id. The special name 'mine' resolves to a 377 unique running owned instance, if there is one; otherwise the name is 378 assumed to be an instance id. 379 :param name: The instance id or the special case 'mine'. 380 :return: The instance to which the name refers. 381 """ 382 if name == "mine": 383 filters: list[FilterTypeDef] = [ 384 {"Name": "tag:LaunchedBy", "Values": [whoami()]}, 385 {"Name": "instance-state-name", "Values": ["pending", "running"]}, 386 ] 387 instances = [i for i in boto3.resource("ec2").instances.filter(Filters=filters)] 388 if not instances: 389 raise RuntimeError("can't understand 'mine': no owned instance?") 390 if len(instances) > 1: 391 raise RuntimeError( 392 f"can't understand 'mine': too many owned instances ({', '.join(i.id for i in instances)})" 393 ) 394 instance = instances[0] 395 say(f"understanding 'mine' as unique owned instance {instance.id}") 396 return instance 397 return boto3.resource("ec2").Instance(name) 398 399 400def get_instances_by_tag(k: str, v: str) -> list[InstanceTypeDef]: 401 return [ 402 i 403 for r in boto3.client("ec2").describe_instances()["Reservations"] 404 for i in r["Instances"] 405 if instance_typedef_tags(i).get(k) == v 406 ] 407 408 409def get_old_instances() -> list[InstanceTypeDef]: 410 def exists(i: InstanceTypeDef) -> bool: 411 return i["State"]["Name"] != "terminated" 412 413 def is_old(i: InstanceTypeDef) -> bool: 414 delete_after = instance_typedef_tags(i).get("scratch-delete-after") 415 if delete_after is None: 416 return False 417 delete_after = float(delete_after) 418 return datetime.datetime.utcnow().timestamp() > delete_after 419 420 return [ 421 i 422 for r in boto3.client("ec2").describe_instances()["Reservations"] 423 for i in r["Instances"] 424 if exists(i) and is_old(i) 425 ] 426 427 428def mssh( 429 instance: Instance, 430 command: str, 431 *, 432 extra_ssh_args: list[str] = [], 433 input: bytes | None = None, 434) -> None: 435 """Runs a command over SSH via EC2 Instance Connect.""" 436 host = instance_host(instance) 437 if command: 438 print(f"{host}$ {command}", file=sys.stderr) 439 # Quote to work around: 440 # https://github.com/aws/aws-ec2-instance-connect-cli/pull/26 441 command = shlex.quote(command) 442 else: 443 print(f"$ mssh {host}") 444 445 subprocess.run( 446 [ 447 *SSH_COMMAND, 448 *extra_ssh_args, 449 host, 450 command, 451 ], 452 check=True, 453 input=input, 454 ) 455 456 457def msftp( 458 instance: Instance, 459) -> None: 460 """Connects over SFTP via EC2 Instance Connect.""" 461 host = instance_host(instance) 462 spawn.runv([*SFTP_COMMAND, host])
84def print_instances(ists: list[Instance], format: str) -> None: 85 field_names = [ 86 "Name", 87 "Instance ID", 88 "Public IP Address", 89 "Private IP Address", 90 "Launched By", 91 "Delete After", 92 "State", 93 ] 94 rows = [ 95 [ 96 name(tags), 97 i.instance_id, 98 i.public_ip_address, 99 i.private_ip_address, 100 launched_by(tags), 101 delete_after(tags), 102 i.state["Name"], 103 ] 104 for (i, tags) in [(i, tags(i)) for i in ists] 105 ] 106 if format == "table": 107 pt = PrettyTable() 108 pt.field_names = field_names 109 pt.add_rows(rows) 110 print(pt) 111 elif format == "csv": 112 w = csv.writer(sys.stdout) 113 w.writerow(field_names) 114 w.writerows(rows) 115 else: 116 raise RuntimeError("Unknown format passed to print_instances")
119def launch( 120 *, 121 key_name: str | None, 122 instance_type: str, 123 ami: str, 124 ami_user: str, 125 tags: dict[str, str], 126 display_name: str | None = None, 127 size_gb: int, 128 security_group_name: str, 129 instance_profile: str | None, 130 nonce: str, 131 delete_after: datetime.datetime, 132) -> Instance: 133 """Launch and configure an ec2 instance with the given properties.""" 134 135 if display_name: 136 tags["Name"] = display_name 137 tags["scratch-delete-after"] = str(delete_after.timestamp()) 138 tags["nonce"] = nonce 139 tags["git_ref"] = git.describe() 140 tags["ami-user"] = ami_user 141 142 ec2 = boto3.client("ec2") 143 groups = ec2.describe_security_groups() 144 security_group_id = None 145 for group in groups["SecurityGroups"]: 146 if group["GroupName"] == security_group_name: 147 security_group_id = group["GroupId"] 148 break 149 150 if security_group_id is None: 151 vpcs = ec2.describe_vpcs() 152 vpc_id = None 153 for vpc in vpcs["Vpcs"]: 154 if vpc["IsDefault"] == True: 155 vpc_id = vpc["VpcId"] 156 break 157 if vpc_id is None: 158 default_vpc = ec2.create_default_vpc() 159 vpc_id = default_vpc["Vpc"]["VpcId"] 160 securitygroup = ec2.create_security_group( 161 GroupName=security_group_name, 162 Description="Allows all.", 163 VpcId=vpc_id, 164 ) 165 security_group_id = securitygroup["GroupId"] 166 ec2.authorize_security_group_ingress( 167 GroupId=security_group_id, 168 CidrIp="0.0.0.0/0", 169 IpProtocol="tcp", 170 FromPort=22, 171 ToPort=22, 172 ) 173 174 network_interface: InstanceNetworkInterfaceSpecificationTypeDef = { 175 "AssociatePublicIpAddress": True, 176 "DeviceIndex": 0, 177 "Groups": [security_group_id], 178 } 179 180 say(f"launching instance {display_name or '(unnamed)'}") 181 with open(MZ_ROOT / "misc" / "scratch" / "provision.bash") as f: 182 provisioning_script = f.read() 183 kwargs: RunInstancesRequestServiceResourceCreateInstancesTypeDef = { 184 "MinCount": 1, 185 "MaxCount": 1, 186 "ImageId": ami, 187 "InstanceType": cast(InstanceTypeType, instance_type), 188 "UserData": provisioning_script, 189 "TagSpecifications": [ 190 { 191 "ResourceType": "instance", 192 "Tags": [{"Key": k, "Value": v} for (k, v) in tags.items()], 193 } 194 ], 195 "NetworkInterfaces": [network_interface], 196 "BlockDeviceMappings": [ 197 { 198 "DeviceName": "/dev/sda1", 199 "Ebs": { 200 "VolumeSize": size_gb, 201 "VolumeType": "gp3", 202 }, 203 } 204 ], 205 "MetadataOptions": { 206 # Allow Docker containers to access IMDSv2. 207 "HttpPutResponseHopLimit": 2, 208 }, 209 } 210 if key_name: 211 kwargs["KeyName"] = key_name 212 if instance_profile: 213 kwargs["IamInstanceProfile"] = {"Name": instance_profile} 214 i = boto3.resource("ec2").create_instances(**kwargs)[0] 215 216 return i
Launch and configure an ec2 instance with the given properties.
CommandResult(status, stdout, stderr)
225async def setup( 226 i: Instance, 227 git_rev: str, 228) -> None: 229 def is_ready(i: Instance) -> bool: 230 return bool( 231 i.public_ip_address and i.state and i.state.get("Name") == "running" 232 ) 233 234 done = False 235 async for remaining in ui.async_timeout_loop(60, 5): 236 say(f"Waiting for instance to become ready: {remaining:0.0f}s remaining") 237 try: 238 i.reload() 239 if is_ready(i): 240 done = True 241 break 242 except ClientError: 243 pass 244 if not done: 245 raise RuntimeError( 246 f"Instance {i} did not become ready in a reasonable amount of time" 247 ) 248 249 done = False 250 async for remaining in ui.async_timeout_loop(300, 5): 251 say(f"Checking whether setup has completed: {remaining:0.0f}s remaining") 252 try: 253 mssh(i, "[[ -f /opt/provision/done ]]") 254 done = True 255 break 256 except CalledProcessError: 257 continue 258 if not done: 259 raise RuntimeError( 260 "Instance did not finish setup in a reasonable amount of time" 261 ) 262 263 mkrepo(i, git_rev)
266def mkrepo(i: Instance, rev: str, init: bool = True, force: bool = False) -> None: 267 if init: 268 mssh( 269 i, 270 "git clone https://github.com/MaterializeInc/materialize.git --recurse-submodules", 271 ) 272 273 rev = git.rev_parse(rev) 274 275 cmd: list[str] = [ 276 "git", 277 "push", 278 "--no-verify", 279 f"{instance_host(i)}:materialize/.git", 280 # Explicit refspec is required if the host repository is in detached 281 # HEAD mode. 282 f"{rev}:refs/heads/scratch", 283 "--no-recurse-submodules", 284 ] 285 if force: 286 cmd.append("--force") 287 288 spawn.runv( 289 cmd, 290 cwd=MZ_ROOT, 291 env=dict(os.environ, GIT_SSH_COMMAND=" ".join(SSH_COMMAND)), 292 ) 293 mssh( 294 i, 295 f"cd materialize && git config core.bare false && git checkout {rev} && git submodule sync --recursive && git submodule update --recursive", 296 )
299class MachineDesc(BaseModel): 300 name: str 301 launch_script: str | None 302 instance_type: str 303 ami: str 304 tags: dict[str, str] = {} 305 size_gb: int 306 checkout: bool = True 307 ami_user: str = "ubuntu"
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
__class_vars__: The names of the class variables defined on the model.
__private_attributes__: Metadata about the private attributes of the model.
__signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.
__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
310def launch_cluster( 311 descs: list[MachineDesc], 312 *, 313 nonce: str | None = None, 314 key_name: str | None = None, 315 security_group_name: str = DEFAULT_SECURITY_GROUP_NAME, 316 instance_profile: str | None = DEFAULT_INSTANCE_PROFILE_NAME, 317 extra_tags: dict[str, str] = {}, 318 delete_after: datetime.datetime, 319 git_rev: str = "HEAD", 320 extra_env: dict[str, str] = {}, 321) -> list[Instance]: 322 """Launch a cluster of instances with a given nonce""" 323 324 if not nonce: 325 nonce = util.nonce(8) 326 327 instances = [ 328 launch( 329 key_name=key_name, 330 instance_type=d.instance_type, 331 ami=d.ami, 332 ami_user=d.ami_user, 333 tags={**d.tags, **extra_tags}, 334 display_name=f"{nonce}-{d.name}", 335 size_gb=d.size_gb, 336 security_group_name=security_group_name, 337 instance_profile=instance_profile, 338 nonce=nonce, 339 delete_after=delete_after, 340 ) 341 for d in descs 342 ] 343 344 loop = asyncio.get_event_loop() 345 loop.run_until_complete( 346 asyncio.gather( 347 *( 348 setup(i, git_rev if d.checkout else "HEAD") 349 for (i, d) in zip(instances, descs) 350 ) 351 ) 352 ) 353 354 hosts_str = "".join( 355 f"{i.private_ip_address}\t{d.name}\n" for (i, d) in zip(instances, descs) 356 ) 357 for i in instances: 358 mssh(i, "sudo tee -a /etc/hosts", input=hosts_str.encode()) 359 360 env = " ".join(f"{k}={shlex.quote(v)}" for k, v in extra_env.items()) 361 for i, d in zip(instances, descs): 362 if d.launch_script: 363 mssh( 364 i, 365 f"(cd materialize && {env} nohup bash -c {shlex.quote(d.launch_script)}) &> mzscratch.log &", 366 ) 367 368 return instances
Launch a cluster of instances with a given nonce
375def get_instance(name: str) -> Instance: 376 """ 377 Get an instance by instance id. The special name 'mine' resolves to a 378 unique running owned instance, if there is one; otherwise the name is 379 assumed to be an instance id. 380 :param name: The instance id or the special case 'mine'. 381 :return: The instance to which the name refers. 382 """ 383 if name == "mine": 384 filters: list[FilterTypeDef] = [ 385 {"Name": "tag:LaunchedBy", "Values": [whoami()]}, 386 {"Name": "instance-state-name", "Values": ["pending", "running"]}, 387 ] 388 instances = [i for i in boto3.resource("ec2").instances.filter(Filters=filters)] 389 if not instances: 390 raise RuntimeError("can't understand 'mine': no owned instance?") 391 if len(instances) > 1: 392 raise RuntimeError( 393 f"can't understand 'mine': too many owned instances ({', '.join(i.id for i in instances)})" 394 ) 395 instance = instances[0] 396 say(f"understanding 'mine' as unique owned instance {instance.id}") 397 return instance 398 return boto3.resource("ec2").Instance(name)
Get an instance by instance id. The special name 'mine' resolves to a unique running owned instance, if there is one; otherwise the name is assumed to be an instance id.
Parameters
- name: The instance id or the special case 'mine'.
Returns
The instance to which the name refers.
410def get_old_instances() -> list[InstanceTypeDef]: 411 def exists(i: InstanceTypeDef) -> bool: 412 return i["State"]["Name"] != "terminated" 413 414 def is_old(i: InstanceTypeDef) -> bool: 415 delete_after = instance_typedef_tags(i).get("scratch-delete-after") 416 if delete_after is None: 417 return False 418 delete_after = float(delete_after) 419 return datetime.datetime.utcnow().timestamp() > delete_after 420 421 return [ 422 i 423 for r in boto3.client("ec2").describe_instances()["Reservations"] 424 for i in r["Instances"] 425 if exists(i) and is_old(i) 426 ]
429def mssh( 430 instance: Instance, 431 command: str, 432 *, 433 extra_ssh_args: list[str] = [], 434 input: bytes | None = None, 435) -> None: 436 """Runs a command over SSH via EC2 Instance Connect.""" 437 host = instance_host(instance) 438 if command: 439 print(f"{host}$ {command}", file=sys.stderr) 440 # Quote to work around: 441 # https://github.com/aws/aws-ec2-instance-connect-cli/pull/26 442 command = shlex.quote(command) 443 else: 444 print(f"$ mssh {host}") 445 446 subprocess.run( 447 [ 448 *SSH_COMMAND, 449 *extra_ssh_args, 450 host, 451 command, 452 ], 453 check=True, 454 input=input, 455 )
Runs a command over SSH via EC2 Instance Connect.
458def msftp( 459 instance: Instance, 460) -> None: 461 """Connects over SFTP via EC2 Instance Connect.""" 462 host = instance_host(instance) 463 spawn.runv([*SFTP_COMMAND, host])
Connects over SFTP via EC2 Instance Connect.