Module materialize.cli.scratch.destroy

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import argparse
import sys

import boto3
from mypy_boto3_ec2.type_defs import FilterTypeDef

from materialize.cli.scratch import check_required_vars
from materialize.scratch import print_instances, ui, whoami


def configure_parser(parser: argparse.ArgumentParser) -> None:
    parser.add_argument(
        "instances",
        nargs="*",
        help="Instance IDs to destroy",
    )
    parser.add_argument(
        "--all-mine",
        action="store_true",
        help="Destroy all of your instances (incompatible with specifying instance IDs)",
    )
    parser.add_argument(
        "-y",
        "--yes",
        action="store_true",
        help="Don't ask for confirmation before destroying",
    )
    parser.add_argument("--output-format", choices=["table", "csv"], default="table")


def run(args: argparse.Namespace) -> None:
    check_required_vars()
    instance_ids = []
    filters: list[FilterTypeDef] = [
        {
            "Name": "instance-state-name",
            "Values": ["pending", "running", "stopping", "stopped"],
        }
    ]
    if args.all_mine:
        if args.instances:
            print(
                "scratch: error: cannot specify --all-mine and instance IDs",
                file=sys.stderr,
            )
            sys.exit(1)
        filters.append({"Name": "tag:LaunchedBy", "Values": [whoami()]})
    elif not args.instances:
        print(
            "scratch: error: must supply at least one instance ID to destroy",
            file=sys.stderr,
        )
        sys.exit(1)
    else:
        instance_ids.extend(args.instances)

    instances = list(
        boto3.resource("ec2").instances.filter(
            Filters=filters, InstanceIds=instance_ids
        )
    )

    print("Destroying instances:")
    print_instances(instances, args.output_format)

    if not args.yes and not ui.confirm("Would you like to continue?"):
        sys.exit(0)

    for instance in instances:
        instance.terminate()
    print("Instances destroyed.")

Functions

def configure_parser(parser: argparse.ArgumentParser) ‑> None
Expand source code Browse git
def configure_parser(parser: argparse.ArgumentParser) -> None:
    parser.add_argument(
        "instances",
        nargs="*",
        help="Instance IDs to destroy",
    )
    parser.add_argument(
        "--all-mine",
        action="store_true",
        help="Destroy all of your instances (incompatible with specifying instance IDs)",
    )
    parser.add_argument(
        "-y",
        "--yes",
        action="store_true",
        help="Don't ask for confirmation before destroying",
    )
    parser.add_argument("--output-format", choices=["table", "csv"], default="table")
def run(args: argparse.Namespace) ‑> None
Expand source code Browse git
def run(args: argparse.Namespace) -> None:
    check_required_vars()
    instance_ids = []
    filters: list[FilterTypeDef] = [
        {
            "Name": "instance-state-name",
            "Values": ["pending", "running", "stopping", "stopped"],
        }
    ]
    if args.all_mine:
        if args.instances:
            print(
                "scratch: error: cannot specify --all-mine and instance IDs",
                file=sys.stderr,
            )
            sys.exit(1)
        filters.append({"Name": "tag:LaunchedBy", "Values": [whoami()]})
    elif not args.instances:
        print(
            "scratch: error: must supply at least one instance ID to destroy",
            file=sys.stderr,
        )
        sys.exit(1)
    else:
        instance_ids.extend(args.instances)

    instances = list(
        boto3.resource("ec2").instances.filter(
            Filters=filters, InstanceIds=instance_ids
        )
    )

    print("Destroying instances:")
    print_instances(instances, args.output_format)

    if not args.yes and not ui.confirm("Would you like to continue?"):
        sys.exit(0)

    for instance in instances:
        instance.terminate()
    print("Instances destroyed.")