misc.python.materialize.scratch
Utilities for launching and interacting with scratch EC2 instances.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10"""Utilities for launching and interacting with scratch EC2 instances.""" 11 12import asyncio 13import csv 14import datetime 15import os 16import shlex 17import subprocess 18import sys 19from subprocess import CalledProcessError 20from typing import NamedTuple, cast 21 22import boto3 23from botocore.exceptions import ClientError 24from mypy_boto3_ec2.literals import InstanceTypeType 25from mypy_boto3_ec2.service_resource import Instance 26from mypy_boto3_ec2.type_defs import ( 27 FilterTypeDef, 28 InstanceNetworkInterfaceSpecificationTypeDef, 29 InstanceTypeDef, 30 RunInstancesRequestServiceResourceCreateInstancesTypeDef, 31) 32from prettytable import PrettyTable 33from pydantic import BaseModel 34 35from materialize import MZ_ROOT, git, spawn, ui, util 36 37# Sane defaults for internal Materialize use in the scratch account 38DEFAULT_SECURITY_GROUP_NAME = "scratch-security-group" 39DEFAULT_INSTANCE_PROFILE_NAME = "admin-instance" 40 41SSH_COMMAND = ["mssh", "-o", "StrictHostKeyChecking=off"] 42SFTP_COMMAND = ["msftp", "-o", "StrictHostKeyChecking=off"] 43 44say = ui.speaker("scratch> ") 45 46 47def tags(i: Instance) -> dict[str, str]: 48 if not i.tags: 49 return {} 50 return {t["Key"]: t["Value"] for t in i.tags} 51 52 53def instance_typedef_tags(i: InstanceTypeDef) -> dict[str, str]: 54 return {t["Key"]: t["Value"] for t in i.get("Tags", [])} 55 56 57def name(tags: dict[str, str]) -> str | None: 58 return tags.get("Name") 59 60 61def launched_by(tags: dict[str, str]) -> str | None: 62 return tags.get("LaunchedBy") 63 64 65def ami_user(tags: dict[str, str]) -> str | None: 66 return tags.get("ami-user", "ubuntu") 67 68 69def delete_after(tags: dict[str, str]) -> datetime.datetime | None: 70 unix = tags.get("scratch-delete-after") 71 if not unix: 72 return None 73 unix = int(float(unix)) 74 return datetime.datetime.fromtimestamp(unix) 75 76 77def instance_host(instance: Instance, user: str | None = None) -> str: 78 if user is None: 79 user = ami_user(tags(instance)) 80 return f"{user}@{instance.id}" 81 82 83def print_instances(ists: list[Instance], format: str) -> None: 84 field_names = [ 85 "Name", 86 "Instance ID", 87 "Public IP Address", 88 "Private IP Address", 89 "Launched By", 90 "Delete After", 91 "State", 92 ] 93 rows = [ 94 [ 95 name(tags), 96 i.instance_id, 97 i.public_ip_address, 98 i.private_ip_address, 99 launched_by(tags), 100 delete_after(tags), 101 i.state["Name"], 102 ] 103 for (i, tags) in [(i, tags(i)) for i in ists] 104 ] 105 if format == "table": 106 pt = PrettyTable() 107 pt.field_names = field_names 108 pt.add_rows(rows) 109 print(pt) 110 elif format == "csv": 111 w = csv.writer(sys.stdout) 112 w.writerow(field_names) 113 w.writerows(rows) 114 else: 115 raise RuntimeError("Unknown format passed to print_instances") 116 117 118def launch( 119 *, 120 key_name: str | None, 121 instance_type: str, 122 ami: str, 123 ami_user: str, 124 tags: dict[str, str], 125 display_name: str | None = None, 126 size_gb: int, 127 security_group_name: str, 128 instance_profile: str | None, 129 nonce: str, 130 delete_after: datetime.datetime, 131) -> Instance: 132 """Launch and configure an ec2 instance with the given properties.""" 133 134 if display_name: 135 tags["Name"] = display_name 136 tags["scratch-delete-after"] = str(delete_after.timestamp()) 137 tags["nonce"] = nonce 138 tags["git_ref"] = git.describe() 139 tags["ami-user"] = ami_user 140 141 ec2 = boto3.client("ec2") 142 groups = ec2.describe_security_groups() 143 security_group_id = None 144 for group in groups["SecurityGroups"]: 145 if group["GroupName"] == security_group_name: 146 security_group_id = group["GroupId"] 147 break 148 149 if security_group_id is None: 150 vpcs = ec2.describe_vpcs() 151 vpc_id = None 152 for vpc in vpcs["Vpcs"]: 153 if vpc["IsDefault"] == True: 154 vpc_id = vpc["VpcId"] 155 break 156 if vpc_id is None: 157 default_vpc = ec2.create_default_vpc() 158 vpc_id = default_vpc["Vpc"]["VpcId"] 159 securitygroup = ec2.create_security_group( 160 GroupName=security_group_name, 161 Description="Allows all.", 162 VpcId=vpc_id, 163 ) 164 security_group_id = securitygroup["GroupId"] 165 ec2.authorize_security_group_ingress( 166 GroupId=security_group_id, 167 CidrIp="0.0.0.0/0", 168 IpProtocol="tcp", 169 FromPort=22, 170 ToPort=22, 171 ) 172 173 network_interface: InstanceNetworkInterfaceSpecificationTypeDef = { 174 "AssociatePublicIpAddress": True, 175 "DeviceIndex": 0, 176 "Groups": [security_group_id], 177 } 178 179 say(f"launching instance {display_name or '(unnamed)'}") 180 with open(MZ_ROOT / "misc" / "scratch" / "provision.bash") as f: 181 provisioning_script = f.read() 182 kwargs: RunInstancesRequestServiceResourceCreateInstancesTypeDef = { 183 "MinCount": 1, 184 "MaxCount": 1, 185 "ImageId": ami, 186 "InstanceType": cast(InstanceTypeType, instance_type), 187 "UserData": provisioning_script, 188 "TagSpecifications": [ 189 { 190 "ResourceType": "instance", 191 "Tags": [{"Key": k, "Value": v} for (k, v) in tags.items()], 192 } 193 ], 194 "NetworkInterfaces": [network_interface], 195 "BlockDeviceMappings": [ 196 { 197 "DeviceName": "/dev/sda1", 198 "Ebs": { 199 "VolumeSize": size_gb, 200 "VolumeType": "gp3", 201 }, 202 } 203 ], 204 "MetadataOptions": { 205 # Allow Docker containers to access IMDSv2. 206 "HttpPutResponseHopLimit": 2, 207 }, 208 } 209 if key_name: 210 kwargs["KeyName"] = key_name 211 if instance_profile: 212 kwargs["IamInstanceProfile"] = {"Name": instance_profile} 213 i = boto3.resource("ec2").create_instances(**kwargs)[0] 214 215 return i 216 217 218class CommandResult(NamedTuple): 219 status: str 220 stdout: str 221 stderr: str 222 223 224async def setup( 225 i: Instance, 226 git_rev: str, 227) -> None: 228 def is_ready(i: Instance) -> bool: 229 return bool( 230 i.public_ip_address and i.state and i.state.get("Name") == "running" 231 ) 232 233 done = False 234 async for remaining in ui.async_timeout_loop(60, 5): 235 say(f"Waiting for instance to become ready: {remaining:0.0f}s remaining") 236 try: 237 i.reload() 238 if is_ready(i): 239 done = True 240 break 241 except ClientError: 242 pass 243 if not done: 244 raise RuntimeError( 245 f"Instance {i} did not become ready in a reasonable amount of time" 246 ) 247 248 done = False 249 async for remaining in ui.async_timeout_loop(300, 5): 250 say(f"Checking whether setup has completed: {remaining:0.0f}s remaining") 251 try: 252 mssh(i, "[[ -f /opt/provision/done ]]") 253 done = True 254 break 255 except CalledProcessError: 256 continue 257 if not done: 258 raise RuntimeError( 259 "Instance did not finish setup in a reasonable amount of time" 260 ) 261 262 mkrepo(i, git_rev) 263 264 265def mkrepo(i: Instance, rev: str, init: bool = True, force: bool = False) -> None: 266 if init: 267 mssh(i, "git clone https://github.com/MaterializeInc/materialize.git") 268 269 rev = git.rev_parse(rev) 270 271 cmd: list[str] = [ 272 "git", 273 "push", 274 "--no-verify", 275 f"{instance_host(i)}:materialize/.git", 276 # Explicit refspec is required if the host repository is in detached 277 # HEAD mode. 278 f"{rev}:refs/heads/scratch", 279 "--no-recurse-submodules", 280 ] 281 if force: 282 cmd.append("--force") 283 284 spawn.runv( 285 cmd, 286 cwd=MZ_ROOT, 287 env=dict(os.environ, GIT_SSH_COMMAND=" ".join(SSH_COMMAND)), 288 ) 289 mssh( 290 i, 291 f"cd materialize && git config core.bare false && git checkout {rev} && git submodule sync --recursive && git submodule update --recursive", 292 ) 293 294 295class MachineDesc(BaseModel): 296 name: str 297 launch_script: str | None 298 instance_type: str 299 ami: str 300 tags: dict[str, str] = {} 301 size_gb: int 302 checkout: bool = True 303 ami_user: str = "ubuntu" 304 305 306def launch_cluster( 307 descs: list[MachineDesc], 308 *, 309 nonce: str | None = None, 310 key_name: str | None = None, 311 security_group_name: str = DEFAULT_SECURITY_GROUP_NAME, 312 instance_profile: str | None = DEFAULT_INSTANCE_PROFILE_NAME, 313 extra_tags: dict[str, str] = {}, 314 delete_after: datetime.datetime, 315 git_rev: str = "HEAD", 316 extra_env: dict[str, str] = {}, 317) -> list[Instance]: 318 """Launch a cluster of instances with a given nonce""" 319 320 if not nonce: 321 nonce = util.nonce(8) 322 323 instances = [ 324 launch( 325 key_name=key_name, 326 instance_type=d.instance_type, 327 ami=d.ami, 328 ami_user=d.ami_user, 329 tags={**d.tags, **extra_tags}, 330 display_name=f"{nonce}-{d.name}", 331 size_gb=d.size_gb, 332 security_group_name=security_group_name, 333 instance_profile=instance_profile, 334 nonce=nonce, 335 delete_after=delete_after, 336 ) 337 for d in descs 338 ] 339 340 loop = asyncio.get_event_loop() 341 loop.run_until_complete( 342 asyncio.gather( 343 *( 344 setup(i, git_rev if d.checkout else "HEAD") 345 for (i, d) in zip(instances, descs) 346 ) 347 ) 348 ) 349 350 hosts_str = "".join( 351 f"{i.private_ip_address}\t{d.name}\n" for (i, d) in zip(instances, descs) 352 ) 353 for i in instances: 354 mssh(i, "sudo tee -a /etc/hosts", input=hosts_str.encode()) 355 356 env = " ".join(f"{k}={shlex.quote(v)}" for k, v in extra_env.items()) 357 for i, d in zip(instances, descs): 358 if d.launch_script: 359 mssh( 360 i, 361 f"(cd materialize && {env} nohup bash -c {shlex.quote(d.launch_script)}) &> mzscratch.log &", 362 ) 363 364 return instances 365 366 367def whoami() -> str: 368 return boto3.client("sts").get_caller_identity()["UserId"].split(":")[1] 369 370 371def get_instance(name: str) -> Instance: 372 """ 373 Get an instance by instance id. The special name 'mine' resolves to a 374 unique running owned instance, if there is one; otherwise the name is 375 assumed to be an instance id. 376 :param name: The instance id or the special case 'mine'. 377 :return: The instance to which the name refers. 378 """ 379 if name == "mine": 380 filters: list[FilterTypeDef] = [ 381 {"Name": "tag:LaunchedBy", "Values": [whoami()]}, 382 {"Name": "instance-state-name", "Values": ["pending", "running"]}, 383 ] 384 instances = [i for i in boto3.resource("ec2").instances.filter(Filters=filters)] 385 if not instances: 386 raise RuntimeError("can't understand 'mine': no owned instance?") 387 if len(instances) > 1: 388 raise RuntimeError( 389 f"can't understand 'mine': too many owned instances ({', '.join(i.id for i in instances)})" 390 ) 391 instance = instances[0] 392 say(f"understanding 'mine' as unique owned instance {instance.id}") 393 return instance 394 return boto3.resource("ec2").Instance(name) 395 396 397def get_instances_by_tag(k: str, v: str) -> list[InstanceTypeDef]: 398 return [ 399 i 400 for r in boto3.client("ec2").describe_instances()["Reservations"] 401 for i in r["Instances"] 402 if instance_typedef_tags(i).get(k) == v 403 ] 404 405 406def get_old_instances() -> list[InstanceTypeDef]: 407 def exists(i: InstanceTypeDef) -> bool: 408 return i["State"]["Name"] != "terminated" 409 410 def is_old(i: InstanceTypeDef) -> bool: 411 delete_after = instance_typedef_tags(i).get("scratch-delete-after") 412 if delete_after is None: 413 return False 414 delete_after = float(delete_after) 415 return datetime.datetime.utcnow().timestamp() > delete_after 416 417 return [ 418 i 419 for r in boto3.client("ec2").describe_instances()["Reservations"] 420 for i in r["Instances"] 421 if exists(i) and is_old(i) 422 ] 423 424 425def mssh( 426 instance: Instance, 427 command: str, 428 *, 429 extra_ssh_args: list[str] = [], 430 input: bytes | None = None, 431) -> None: 432 """Runs a command over SSH via EC2 Instance Connect.""" 433 host = instance_host(instance) 434 if command: 435 print(f"{host}$ {command}", file=sys.stderr) 436 # Quote to work around: 437 # https://github.com/aws/aws-ec2-instance-connect-cli/pull/26 438 command = shlex.quote(command) 439 else: 440 print(f"$ mssh {host}") 441 442 subprocess.run( 443 [ 444 *SSH_COMMAND, 445 *extra_ssh_args, 446 host, 447 command, 448 ], 449 check=True, 450 input=input, 451 ) 452 453 454def msftp( 455 instance: Instance, 456) -> None: 457 """Connects over SFTP via EC2 Instance Connect.""" 458 host = instance_host(instance) 459 spawn.runv([*SFTP_COMMAND, host])
84def print_instances(ists: list[Instance], format: str) -> None: 85 field_names = [ 86 "Name", 87 "Instance ID", 88 "Public IP Address", 89 "Private IP Address", 90 "Launched By", 91 "Delete After", 92 "State", 93 ] 94 rows = [ 95 [ 96 name(tags), 97 i.instance_id, 98 i.public_ip_address, 99 i.private_ip_address, 100 launched_by(tags), 101 delete_after(tags), 102 i.state["Name"], 103 ] 104 for (i, tags) in [(i, tags(i)) for i in ists] 105 ] 106 if format == "table": 107 pt = PrettyTable() 108 pt.field_names = field_names 109 pt.add_rows(rows) 110 print(pt) 111 elif format == "csv": 112 w = csv.writer(sys.stdout) 113 w.writerow(field_names) 114 w.writerows(rows) 115 else: 116 raise RuntimeError("Unknown format passed to print_instances")
119def launch( 120 *, 121 key_name: str | None, 122 instance_type: str, 123 ami: str, 124 ami_user: str, 125 tags: dict[str, str], 126 display_name: str | None = None, 127 size_gb: int, 128 security_group_name: str, 129 instance_profile: str | None, 130 nonce: str, 131 delete_after: datetime.datetime, 132) -> Instance: 133 """Launch and configure an ec2 instance with the given properties.""" 134 135 if display_name: 136 tags["Name"] = display_name 137 tags["scratch-delete-after"] = str(delete_after.timestamp()) 138 tags["nonce"] = nonce 139 tags["git_ref"] = git.describe() 140 tags["ami-user"] = ami_user 141 142 ec2 = boto3.client("ec2") 143 groups = ec2.describe_security_groups() 144 security_group_id = None 145 for group in groups["SecurityGroups"]: 146 if group["GroupName"] == security_group_name: 147 security_group_id = group["GroupId"] 148 break 149 150 if security_group_id is None: 151 vpcs = ec2.describe_vpcs() 152 vpc_id = None 153 for vpc in vpcs["Vpcs"]: 154 if vpc["IsDefault"] == True: 155 vpc_id = vpc["VpcId"] 156 break 157 if vpc_id is None: 158 default_vpc = ec2.create_default_vpc() 159 vpc_id = default_vpc["Vpc"]["VpcId"] 160 securitygroup = ec2.create_security_group( 161 GroupName=security_group_name, 162 Description="Allows all.", 163 VpcId=vpc_id, 164 ) 165 security_group_id = securitygroup["GroupId"] 166 ec2.authorize_security_group_ingress( 167 GroupId=security_group_id, 168 CidrIp="0.0.0.0/0", 169 IpProtocol="tcp", 170 FromPort=22, 171 ToPort=22, 172 ) 173 174 network_interface: InstanceNetworkInterfaceSpecificationTypeDef = { 175 "AssociatePublicIpAddress": True, 176 "DeviceIndex": 0, 177 "Groups": [security_group_id], 178 } 179 180 say(f"launching instance {display_name or '(unnamed)'}") 181 with open(MZ_ROOT / "misc" / "scratch" / "provision.bash") as f: 182 provisioning_script = f.read() 183 kwargs: RunInstancesRequestServiceResourceCreateInstancesTypeDef = { 184 "MinCount": 1, 185 "MaxCount": 1, 186 "ImageId": ami, 187 "InstanceType": cast(InstanceTypeType, instance_type), 188 "UserData": provisioning_script, 189 "TagSpecifications": [ 190 { 191 "ResourceType": "instance", 192 "Tags": [{"Key": k, "Value": v} for (k, v) in tags.items()], 193 } 194 ], 195 "NetworkInterfaces": [network_interface], 196 "BlockDeviceMappings": [ 197 { 198 "DeviceName": "/dev/sda1", 199 "Ebs": { 200 "VolumeSize": size_gb, 201 "VolumeType": "gp3", 202 }, 203 } 204 ], 205 "MetadataOptions": { 206 # Allow Docker containers to access IMDSv2. 207 "HttpPutResponseHopLimit": 2, 208 }, 209 } 210 if key_name: 211 kwargs["KeyName"] = key_name 212 if instance_profile: 213 kwargs["IamInstanceProfile"] = {"Name": instance_profile} 214 i = boto3.resource("ec2").create_instances(**kwargs)[0] 215 216 return i
Launch and configure an ec2 instance with the given properties.
CommandResult(status, stdout, stderr)
225async def setup( 226 i: Instance, 227 git_rev: str, 228) -> None: 229 def is_ready(i: Instance) -> bool: 230 return bool( 231 i.public_ip_address and i.state and i.state.get("Name") == "running" 232 ) 233 234 done = False 235 async for remaining in ui.async_timeout_loop(60, 5): 236 say(f"Waiting for instance to become ready: {remaining:0.0f}s remaining") 237 try: 238 i.reload() 239 if is_ready(i): 240 done = True 241 break 242 except ClientError: 243 pass 244 if not done: 245 raise RuntimeError( 246 f"Instance {i} did not become ready in a reasonable amount of time" 247 ) 248 249 done = False 250 async for remaining in ui.async_timeout_loop(300, 5): 251 say(f"Checking whether setup has completed: {remaining:0.0f}s remaining") 252 try: 253 mssh(i, "[[ -f /opt/provision/done ]]") 254 done = True 255 break 256 except CalledProcessError: 257 continue 258 if not done: 259 raise RuntimeError( 260 "Instance did not finish setup in a reasonable amount of time" 261 ) 262 263 mkrepo(i, git_rev)
266def mkrepo(i: Instance, rev: str, init: bool = True, force: bool = False) -> None: 267 if init: 268 mssh(i, "git clone https://github.com/MaterializeInc/materialize.git") 269 270 rev = git.rev_parse(rev) 271 272 cmd: list[str] = [ 273 "git", 274 "push", 275 "--no-verify", 276 f"{instance_host(i)}:materialize/.git", 277 # Explicit refspec is required if the host repository is in detached 278 # HEAD mode. 279 f"{rev}:refs/heads/scratch", 280 "--no-recurse-submodules", 281 ] 282 if force: 283 cmd.append("--force") 284 285 spawn.runv( 286 cmd, 287 cwd=MZ_ROOT, 288 env=dict(os.environ, GIT_SSH_COMMAND=" ".join(SSH_COMMAND)), 289 ) 290 mssh( 291 i, 292 f"cd materialize && git config core.bare false && git checkout {rev} && git submodule sync --recursive && git submodule update --recursive", 293 )
296class MachineDesc(BaseModel): 297 name: str 298 launch_script: str | None 299 instance_type: str 300 ami: str 301 tags: dict[str, str] = {} 302 size_gb: int 303 checkout: bool = True 304 ami_user: str = "ubuntu"
Usage docs: https://docs.pydantic.dev/2.8/concepts/models/
A base class for creating Pydantic models.
Attributes: __class_vars__: The names of classvars defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The signature for instantiating the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a `RootModel`.
__pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_extra__: An instance attribute with the values of extra fields from validation when
`model_config['extra'] == 'allow'`.
__pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
__pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
307def launch_cluster( 308 descs: list[MachineDesc], 309 *, 310 nonce: str | None = None, 311 key_name: str | None = None, 312 security_group_name: str = DEFAULT_SECURITY_GROUP_NAME, 313 instance_profile: str | None = DEFAULT_INSTANCE_PROFILE_NAME, 314 extra_tags: dict[str, str] = {}, 315 delete_after: datetime.datetime, 316 git_rev: str = "HEAD", 317 extra_env: dict[str, str] = {}, 318) -> list[Instance]: 319 """Launch a cluster of instances with a given nonce""" 320 321 if not nonce: 322 nonce = util.nonce(8) 323 324 instances = [ 325 launch( 326 key_name=key_name, 327 instance_type=d.instance_type, 328 ami=d.ami, 329 ami_user=d.ami_user, 330 tags={**d.tags, **extra_tags}, 331 display_name=f"{nonce}-{d.name}", 332 size_gb=d.size_gb, 333 security_group_name=security_group_name, 334 instance_profile=instance_profile, 335 nonce=nonce, 336 delete_after=delete_after, 337 ) 338 for d in descs 339 ] 340 341 loop = asyncio.get_event_loop() 342 loop.run_until_complete( 343 asyncio.gather( 344 *( 345 setup(i, git_rev if d.checkout else "HEAD") 346 for (i, d) in zip(instances, descs) 347 ) 348 ) 349 ) 350 351 hosts_str = "".join( 352 f"{i.private_ip_address}\t{d.name}\n" for (i, d) in zip(instances, descs) 353 ) 354 for i in instances: 355 mssh(i, "sudo tee -a /etc/hosts", input=hosts_str.encode()) 356 357 env = " ".join(f"{k}={shlex.quote(v)}" for k, v in extra_env.items()) 358 for i, d in zip(instances, descs): 359 if d.launch_script: 360 mssh( 361 i, 362 f"(cd materialize && {env} nohup bash -c {shlex.quote(d.launch_script)}) &> mzscratch.log &", 363 ) 364 365 return instances
Launch a cluster of instances with a given nonce
372def get_instance(name: str) -> Instance: 373 """ 374 Get an instance by instance id. The special name 'mine' resolves to a 375 unique running owned instance, if there is one; otherwise the name is 376 assumed to be an instance id. 377 :param name: The instance id or the special case 'mine'. 378 :return: The instance to which the name refers. 379 """ 380 if name == "mine": 381 filters: list[FilterTypeDef] = [ 382 {"Name": "tag:LaunchedBy", "Values": [whoami()]}, 383 {"Name": "instance-state-name", "Values": ["pending", "running"]}, 384 ] 385 instances = [i for i in boto3.resource("ec2").instances.filter(Filters=filters)] 386 if not instances: 387 raise RuntimeError("can't understand 'mine': no owned instance?") 388 if len(instances) > 1: 389 raise RuntimeError( 390 f"can't understand 'mine': too many owned instances ({', '.join(i.id for i in instances)})" 391 ) 392 instance = instances[0] 393 say(f"understanding 'mine' as unique owned instance {instance.id}") 394 return instance 395 return boto3.resource("ec2").Instance(name)
Get an instance by instance id. The special name 'mine' resolves to a unique running owned instance, if there is one; otherwise the name is assumed to be an instance id.
Parameters
- name: The instance id or the special case 'mine'.
Returns
The instance to which the name refers.
407def get_old_instances() -> list[InstanceTypeDef]: 408 def exists(i: InstanceTypeDef) -> bool: 409 return i["State"]["Name"] != "terminated" 410 411 def is_old(i: InstanceTypeDef) -> bool: 412 delete_after = instance_typedef_tags(i).get("scratch-delete-after") 413 if delete_after is None: 414 return False 415 delete_after = float(delete_after) 416 return datetime.datetime.utcnow().timestamp() > delete_after 417 418 return [ 419 i 420 for r in boto3.client("ec2").describe_instances()["Reservations"] 421 for i in r["Instances"] 422 if exists(i) and is_old(i) 423 ]
426def mssh( 427 instance: Instance, 428 command: str, 429 *, 430 extra_ssh_args: list[str] = [], 431 input: bytes | None = None, 432) -> None: 433 """Runs a command over SSH via EC2 Instance Connect.""" 434 host = instance_host(instance) 435 if command: 436 print(f"{host}$ {command}", file=sys.stderr) 437 # Quote to work around: 438 # https://github.com/aws/aws-ec2-instance-connect-cli/pull/26 439 command = shlex.quote(command) 440 else: 441 print(f"$ mssh {host}") 442 443 subprocess.run( 444 [ 445 *SSH_COMMAND, 446 *extra_ssh_args, 447 host, 448 command, 449 ], 450 check=True, 451 input=input, 452 )
Runs a command over SSH via EC2 Instance Connect.
455def msftp( 456 instance: Instance, 457) -> None: 458 """Connects over SFTP via EC2 Instance Connect.""" 459 host = instance_host(instance) 460 spawn.runv([*SFTP_COMMAND, host])
Connects over SFTP via EC2 Instance Connect.