Module materialize.cli.scratch.destroy
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import argparse
import sys
import boto3
from mypy_boto3_ec2.type_defs import FilterTypeDef
from materialize.cli.scratch import check_required_vars
from materialize.scratch import print_instances, ui, whoami
def configure_parser(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"instances",
nargs="*",
help="Instance IDs to destroy",
)
parser.add_argument(
"--all-mine",
action="store_true",
help="Destroy all of your instances (incompatible with specifying instance IDs)",
)
parser.add_argument(
"-y",
"--yes",
action="store_true",
help="Don't ask for confirmation before destroying",
)
parser.add_argument("--output-format", choices=["table", "csv"], default="table")
def run(args: argparse.Namespace) -> None:
check_required_vars()
instance_ids = []
filters: list[FilterTypeDef] = [
{
"Name": "instance-state-name",
"Values": ["pending", "running", "stopping", "stopped"],
}
]
if args.all_mine:
if args.instances:
print(
"scratch: error: cannot specify --all-mine and instance IDs",
file=sys.stderr,
)
sys.exit(1)
filters.append({"Name": "tag:LaunchedBy", "Values": [whoami()]})
elif not args.instances:
print(
"scratch: error: must supply at least one instance ID to destroy",
file=sys.stderr,
)
sys.exit(1)
else:
instance_ids.extend(args.instances)
instances = list(
boto3.resource("ec2").instances.filter(
Filters=filters, InstanceIds=instance_ids
)
)
print("Destroying instances:")
print_instances(instances, args.output_format)
if not args.yes and not ui.confirm("Would you like to continue?"):
sys.exit(0)
for instance in instances:
instance.terminate()
print("Instances destroyed.")
Functions
def configure_parser(parser: argparse.ArgumentParser) ‑> None
-
Expand source code Browse git
def configure_parser(parser: argparse.ArgumentParser) -> None: parser.add_argument( "instances", nargs="*", help="Instance IDs to destroy", ) parser.add_argument( "--all-mine", action="store_true", help="Destroy all of your instances (incompatible with specifying instance IDs)", ) parser.add_argument( "-y", "--yes", action="store_true", help="Don't ask for confirmation before destroying", ) parser.add_argument("--output-format", choices=["table", "csv"], default="table")
def run(args: argparse.Namespace) ‑> None
-
Expand source code Browse git
def run(args: argparse.Namespace) -> None: check_required_vars() instance_ids = [] filters: list[FilterTypeDef] = [ { "Name": "instance-state-name", "Values": ["pending", "running", "stopping", "stopped"], } ] if args.all_mine: if args.instances: print( "scratch: error: cannot specify --all-mine and instance IDs", file=sys.stderr, ) sys.exit(1) filters.append({"Name": "tag:LaunchedBy", "Values": [whoami()]}) elif not args.instances: print( "scratch: error: must supply at least one instance ID to destroy", file=sys.stderr, ) sys.exit(1) else: instance_ids.extend(args.instances) instances = list( boto3.resource("ec2").instances.filter( Filters=filters, InstanceIds=instance_ids ) ) print("Destroying instances:") print_instances(instances, args.output_format) if not args.yes and not ui.confirm("Would you like to continue?"): sys.exit(0) for instance in instances: instance.terminate() print("Instances destroyed.")