Module materialize.checks.all_checks.kafka_protocols

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion


@externally_idempotent(False)
class KafkaProtocols(Check):
    def _can_run(self, e: Executor) -> bool:
        return self.base_version >= MzVersion.parse_mz("v0.78.0-dev")

    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ set-from-file ca-crt=/share/secrets/ca.crt
                $ set-from-file kafka-crt=/share/secrets/materialized-kafka.crt
                $ set-from-file kafka-key=/share/secrets/materialized-kafka.key
                $ set-from-file kafka1-crt=/share/secrets/materialized-kafka1.crt
                $ set-from-file kafka1-key=/share/secrets/materialized-kafka1.key

                > CREATE SECRET kafka_key AS '${kafka-key}'
                > CREATE SECRET kafka1_key AS '${kafka1-key}'
                > CREATE SECRET garbage_key AS 'garbage'

                $ kafka-create-topic topic=kafka-protocols-1
                $ kafka-ingest topic=kafka-protocols-1 format=bytes
                one

                $ kafka-create-topic topic=kafka-protocols-2
                $ kafka-ingest topic=kafka-protocols-2 format=bytes
                one

                $ kafka-create-topic topic=kafka-protocols-3
                $ kafka-ingest topic=kafka-protocols-3 format=bytes
                one

                > CREATE SECRET kafka_protocols_password AS 'sekurity'

                > CREATE CONNECTION kafka_plaintext TO KAFKA (
                    BROKER 'kafka:9093' USING SSH TUNNEL ssh_tunnel_0,
                    SSL CERTIFICATE AUTHORITY '${ca-crt}'
                  )

                > CREATE CONNECTION kafka_ssl TO KAFKA (
                    BROKER 'kafka:9094',
                    SSL CERTIFICATE '${kafka-crt}',
                    SSL KEY SECRET kafka_key,
                    SSL CERTIFICATE AUTHORITY '${ca-crt}'
                  )

                > CREATE CONNECTION kafka_scram_sha_512 TO KAFKA (
                    BROKER 'kafka:9095' USING SSH TUNNEL ssh_tunnel_0,
                    SASL MECHANISMS 'SCRAM-SHA-512',
                    SASL USERNAME 'materialize',
                    SASL PASSWORD SECRET kafka_protocols_password,
                    SECURITY PROTOCOL SASL_PLAINTEXT
                  )

                > CREATE CONNECTION kafka_ssl_scram_sha_512 TO KAFKA (
                    BROKER 'kafka:9096',
                    SASL MECHANISMS 'SCRAM-SHA-512',
                    SASL USERNAME 'materialize',
                    SASL PASSWORD SECRET kafka_protocols_password,
                    SSL CERTIFICATE AUTHORITY '${ca-crt}'
                  )

                > CREATE CONNECTION kafka_sasl TO KAFKA (
                    BROKER 'kafka:9097' USING SSH TUNNEL ssh_tunnel_0,
                    SASL MECHANISMS 'PLAIN',
                    SASL USERNAME 'materialize',
                    SASL PASSWORD SECRET kafka_protocols_password,
                    SSL CERTIFICATE '${kafka-crt}',
                    SSL KEY SECRET kafka_key,
                    SSL CERTIFICATE AUTHORITY '${ca-crt}'
                  )

                > CREATE SOURCE kafka_plaintext_1 FROM KAFKA CONNECTION kafka_plaintext (
                    TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_ssl_1 FROM KAFKA CONNECTION kafka_ssl (
                    TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_scram_sha_512_1 FROM KAFKA CONNECTION kafka_scram_sha_512 (
                    TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_ssl_scram_sha_512_1 FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
                    TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_sasl_1 FROM KAFKA CONNECTION kafka_sasl (
                    TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
                  ) FORMAT TEXT
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(dedent(s))
            for s in [
                """
                > CREATE SOURCE kafka_plaintext_2 FROM KAFKA CONNECTION kafka_plaintext (
                    TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_ssl_2 FROM KAFKA CONNECTION kafka_ssl (
                    TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_scram_sha_512_2 FROM KAFKA CONNECTION kafka_scram_sha_512 (
                    TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_ssl_scram_sha_512_2 FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
                    TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_sasl_2 FROM KAFKA CONNECTION kafka_sasl (
                    TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
                  ) FORMAT TEXT

                $ kafka-ingest topic=kafka-protocols-1 format=bytes
                two

                $ kafka-ingest topic=kafka-protocols-2 format=bytes
                two

                $ kafka-ingest topic=kafka-protocols-3 format=bytes
                two
                """,
                """
                > CREATE SOURCE kafka_plaintext_3 FROM KAFKA CONNECTION kafka_plaintext (
                    TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_ssl_3 FROM KAFKA CONNECTION kafka_ssl (
                    TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_scram_sha_512_3 FROM KAFKA CONNECTION kafka_scram_sha_512 (
                    TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_ssl_scram_sha_512_3 FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
                    TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_sasl_3 FROM KAFKA CONNECTION kafka_sasl (
                    TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
                  ) FORMAT TEXT

                $ kafka-ingest topic=kafka-protocols-1 format=bytes
                three

                $ kafka-ingest topic=kafka-protocols-2 format=bytes
                three

                $ kafka-ingest topic=kafka-protocols-3 format=bytes
                three
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SELECT * FROM kafka_plaintext_1
                one
                two
                three

                > SELECT * FROM kafka_ssl_1
                one
                two
                three

                > SELECT * FROM kafka_scram_sha_512_1
                one
                two
                three

                > SELECT * FROM kafka_ssl_scram_sha_512_1
                one
                two
                three

                > SELECT * FROM kafka_sasl_1
                one
                two
                three

                > SELECT * FROM kafka_plaintext_2
                one
                two
                three

                > SELECT * FROM kafka_ssl_2
                one
                two
                three

                > SELECT * FROM kafka_scram_sha_512_2
                one
                two
                three

                > SELECT * FROM kafka_ssl_scram_sha_512_2
                one
                two
                three

                > SELECT * FROM kafka_sasl_2
                one
                two
                three

                > SELECT * FROM kafka_plaintext_3
                one
                two
                three

                > SELECT * FROM kafka_ssl_3
                one
                two
                three

                > SELECT * FROM kafka_scram_sha_512_3
                one
                two
                three

                > SELECT * FROM kafka_ssl_scram_sha_512_3
                one
                two
                three

                > SELECT * FROM kafka_sasl_3
                one
                two
                three
                """
            )
        )

Classes

class KafkaProtocols (base_version: MzVersion, rng: random.Random | None)
Expand source code Browse git
@externally_idempotent(False)
class KafkaProtocols(Check):
    def _can_run(self, e: Executor) -> bool:
        return self.base_version >= MzVersion.parse_mz("v0.78.0-dev")

    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                $ set-from-file ca-crt=/share/secrets/ca.crt
                $ set-from-file kafka-crt=/share/secrets/materialized-kafka.crt
                $ set-from-file kafka-key=/share/secrets/materialized-kafka.key
                $ set-from-file kafka1-crt=/share/secrets/materialized-kafka1.crt
                $ set-from-file kafka1-key=/share/secrets/materialized-kafka1.key

                > CREATE SECRET kafka_key AS '${kafka-key}'
                > CREATE SECRET kafka1_key AS '${kafka1-key}'
                > CREATE SECRET garbage_key AS 'garbage'

                $ kafka-create-topic topic=kafka-protocols-1
                $ kafka-ingest topic=kafka-protocols-1 format=bytes
                one

                $ kafka-create-topic topic=kafka-protocols-2
                $ kafka-ingest topic=kafka-protocols-2 format=bytes
                one

                $ kafka-create-topic topic=kafka-protocols-3
                $ kafka-ingest topic=kafka-protocols-3 format=bytes
                one

                > CREATE SECRET kafka_protocols_password AS 'sekurity'

                > CREATE CONNECTION kafka_plaintext TO KAFKA (
                    BROKER 'kafka:9093' USING SSH TUNNEL ssh_tunnel_0,
                    SSL CERTIFICATE AUTHORITY '${ca-crt}'
                  )

                > CREATE CONNECTION kafka_ssl TO KAFKA (
                    BROKER 'kafka:9094',
                    SSL CERTIFICATE '${kafka-crt}',
                    SSL KEY SECRET kafka_key,
                    SSL CERTIFICATE AUTHORITY '${ca-crt}'
                  )

                > CREATE CONNECTION kafka_scram_sha_512 TO KAFKA (
                    BROKER 'kafka:9095' USING SSH TUNNEL ssh_tunnel_0,
                    SASL MECHANISMS 'SCRAM-SHA-512',
                    SASL USERNAME 'materialize',
                    SASL PASSWORD SECRET kafka_protocols_password,
                    SECURITY PROTOCOL SASL_PLAINTEXT
                  )

                > CREATE CONNECTION kafka_ssl_scram_sha_512 TO KAFKA (
                    BROKER 'kafka:9096',
                    SASL MECHANISMS 'SCRAM-SHA-512',
                    SASL USERNAME 'materialize',
                    SASL PASSWORD SECRET kafka_protocols_password,
                    SSL CERTIFICATE AUTHORITY '${ca-crt}'
                  )

                > CREATE CONNECTION kafka_sasl TO KAFKA (
                    BROKER 'kafka:9097' USING SSH TUNNEL ssh_tunnel_0,
                    SASL MECHANISMS 'PLAIN',
                    SASL USERNAME 'materialize',
                    SASL PASSWORD SECRET kafka_protocols_password,
                    SSL CERTIFICATE '${kafka-crt}',
                    SSL KEY SECRET kafka_key,
                    SSL CERTIFICATE AUTHORITY '${ca-crt}'
                  )

                > CREATE SOURCE kafka_plaintext_1 FROM KAFKA CONNECTION kafka_plaintext (
                    TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_ssl_1 FROM KAFKA CONNECTION kafka_ssl (
                    TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_scram_sha_512_1 FROM KAFKA CONNECTION kafka_scram_sha_512 (
                    TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_ssl_scram_sha_512_1 FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
                    TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_sasl_1 FROM KAFKA CONNECTION kafka_sasl (
                    TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
                  ) FORMAT TEXT
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(dedent(s))
            for s in [
                """
                > CREATE SOURCE kafka_plaintext_2 FROM KAFKA CONNECTION kafka_plaintext (
                    TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_ssl_2 FROM KAFKA CONNECTION kafka_ssl (
                    TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_scram_sha_512_2 FROM KAFKA CONNECTION kafka_scram_sha_512 (
                    TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_ssl_scram_sha_512_2 FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
                    TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_sasl_2 FROM KAFKA CONNECTION kafka_sasl (
                    TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
                  ) FORMAT TEXT

                $ kafka-ingest topic=kafka-protocols-1 format=bytes
                two

                $ kafka-ingest topic=kafka-protocols-2 format=bytes
                two

                $ kafka-ingest topic=kafka-protocols-3 format=bytes
                two
                """,
                """
                > CREATE SOURCE kafka_plaintext_3 FROM KAFKA CONNECTION kafka_plaintext (
                    TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_ssl_3 FROM KAFKA CONNECTION kafka_ssl (
                    TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_scram_sha_512_3 FROM KAFKA CONNECTION kafka_scram_sha_512 (
                    TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_ssl_scram_sha_512_3 FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
                    TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
                  ) FORMAT TEXT

                > CREATE SOURCE kafka_sasl_3 FROM KAFKA CONNECTION kafka_sasl (
                    TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
                  ) FORMAT TEXT

                $ kafka-ingest topic=kafka-protocols-1 format=bytes
                three

                $ kafka-ingest topic=kafka-protocols-2 format=bytes
                three

                $ kafka-ingest topic=kafka-protocols-3 format=bytes
                three
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SELECT * FROM kafka_plaintext_1
                one
                two
                three

                > SELECT * FROM kafka_ssl_1
                one
                two
                three

                > SELECT * FROM kafka_scram_sha_512_1
                one
                two
                three

                > SELECT * FROM kafka_ssl_scram_sha_512_1
                one
                two
                three

                > SELECT * FROM kafka_sasl_1
                one
                two
                three

                > SELECT * FROM kafka_plaintext_2
                one
                two
                three

                > SELECT * FROM kafka_ssl_2
                one
                two
                three

                > SELECT * FROM kafka_scram_sha_512_2
                one
                two
                three

                > SELECT * FROM kafka_ssl_scram_sha_512_2
                one
                two
                three

                > SELECT * FROM kafka_sasl_2
                one
                two
                three

                > SELECT * FROM kafka_plaintext_3
                one
                two
                three

                > SELECT * FROM kafka_ssl_3
                one
                two
                three

                > SELECT * FROM kafka_scram_sha_512_3
                one
                two
                three

                > SELECT * FROM kafka_ssl_scram_sha_512_3
                one
                two
                three

                > SELECT * FROM kafka_sasl_3
                one
                two
                three
                """
            )
        )

Ancestors

Class variables

var externally_idempotent : bool

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            $ set-from-file ca-crt=/share/secrets/ca.crt
            $ set-from-file kafka-crt=/share/secrets/materialized-kafka.crt
            $ set-from-file kafka-key=/share/secrets/materialized-kafka.key
            $ set-from-file kafka1-crt=/share/secrets/materialized-kafka1.crt
            $ set-from-file kafka1-key=/share/secrets/materialized-kafka1.key

            > CREATE SECRET kafka_key AS '${kafka-key}'
            > CREATE SECRET kafka1_key AS '${kafka1-key}'
            > CREATE SECRET garbage_key AS 'garbage'

            $ kafka-create-topic topic=kafka-protocols-1
            $ kafka-ingest topic=kafka-protocols-1 format=bytes
            one

            $ kafka-create-topic topic=kafka-protocols-2
            $ kafka-ingest topic=kafka-protocols-2 format=bytes
            one

            $ kafka-create-topic topic=kafka-protocols-3
            $ kafka-ingest topic=kafka-protocols-3 format=bytes
            one

            > CREATE SECRET kafka_protocols_password AS 'sekurity'

            > CREATE CONNECTION kafka_plaintext TO KAFKA (
                BROKER 'kafka:9093' USING SSH TUNNEL ssh_tunnel_0,
                SSL CERTIFICATE AUTHORITY '${ca-crt}'
              )

            > CREATE CONNECTION kafka_ssl TO KAFKA (
                BROKER 'kafka:9094',
                SSL CERTIFICATE '${kafka-crt}',
                SSL KEY SECRET kafka_key,
                SSL CERTIFICATE AUTHORITY '${ca-crt}'
              )

            > CREATE CONNECTION kafka_scram_sha_512 TO KAFKA (
                BROKER 'kafka:9095' USING SSH TUNNEL ssh_tunnel_0,
                SASL MECHANISMS 'SCRAM-SHA-512',
                SASL USERNAME 'materialize',
                SASL PASSWORD SECRET kafka_protocols_password,
                SECURITY PROTOCOL SASL_PLAINTEXT
              )

            > CREATE CONNECTION kafka_ssl_scram_sha_512 TO KAFKA (
                BROKER 'kafka:9096',
                SASL MECHANISMS 'SCRAM-SHA-512',
                SASL USERNAME 'materialize',
                SASL PASSWORD SECRET kafka_protocols_password,
                SSL CERTIFICATE AUTHORITY '${ca-crt}'
              )

            > CREATE CONNECTION kafka_sasl TO KAFKA (
                BROKER 'kafka:9097' USING SSH TUNNEL ssh_tunnel_0,
                SASL MECHANISMS 'PLAIN',
                SASL USERNAME 'materialize',
                SASL PASSWORD SECRET kafka_protocols_password,
                SSL CERTIFICATE '${kafka-crt}',
                SSL KEY SECRET kafka_key,
                SSL CERTIFICATE AUTHORITY '${ca-crt}'
              )

            > CREATE SOURCE kafka_plaintext_1 FROM KAFKA CONNECTION kafka_plaintext (
                TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
              ) FORMAT TEXT

            > CREATE SOURCE kafka_ssl_1 FROM KAFKA CONNECTION kafka_ssl (
                TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
              ) FORMAT TEXT

            > CREATE SOURCE kafka_scram_sha_512_1 FROM KAFKA CONNECTION kafka_scram_sha_512 (
                TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
              ) FORMAT TEXT

            > CREATE SOURCE kafka_ssl_scram_sha_512_1 FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
                TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
              ) FORMAT TEXT

            > CREATE SOURCE kafka_sasl_1 FROM KAFKA CONNECTION kafka_sasl (
                TOPIC 'testdrive-kafka-protocols-1-${testdrive.seed}'
              ) FORMAT TEXT
            """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(dedent(s))
        for s in [
            """
            > CREATE SOURCE kafka_plaintext_2 FROM KAFKA CONNECTION kafka_plaintext (
                TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
              ) FORMAT TEXT

            > CREATE SOURCE kafka_ssl_2 FROM KAFKA CONNECTION kafka_ssl (
                TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
              ) FORMAT TEXT

            > CREATE SOURCE kafka_scram_sha_512_2 FROM KAFKA CONNECTION kafka_scram_sha_512 (
                TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
              ) FORMAT TEXT

            > CREATE SOURCE kafka_ssl_scram_sha_512_2 FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
                TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
              ) FORMAT TEXT

            > CREATE SOURCE kafka_sasl_2 FROM KAFKA CONNECTION kafka_sasl (
                TOPIC 'testdrive-kafka-protocols-2-${testdrive.seed}'
              ) FORMAT TEXT

            $ kafka-ingest topic=kafka-protocols-1 format=bytes
            two

            $ kafka-ingest topic=kafka-protocols-2 format=bytes
            two

            $ kafka-ingest topic=kafka-protocols-3 format=bytes
            two
            """,
            """
            > CREATE SOURCE kafka_plaintext_3 FROM KAFKA CONNECTION kafka_plaintext (
                TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
              ) FORMAT TEXT

            > CREATE SOURCE kafka_ssl_3 FROM KAFKA CONNECTION kafka_ssl (
                TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
              ) FORMAT TEXT

            > CREATE SOURCE kafka_scram_sha_512_3 FROM KAFKA CONNECTION kafka_scram_sha_512 (
                TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
              ) FORMAT TEXT

            > CREATE SOURCE kafka_ssl_scram_sha_512_3 FROM KAFKA CONNECTION kafka_ssl_scram_sha_512 (
                TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
              ) FORMAT TEXT

            > CREATE SOURCE kafka_sasl_3 FROM KAFKA CONNECTION kafka_sasl (
                TOPIC 'testdrive-kafka-protocols-3-${testdrive.seed}'
              ) FORMAT TEXT

            $ kafka-ingest topic=kafka-protocols-1 format=bytes
            three

            $ kafka-ingest topic=kafka-protocols-2 format=bytes
            three

            $ kafka-ingest topic=kafka-protocols-3 format=bytes
            three
            """,
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            > SELECT * FROM kafka_plaintext_1
            one
            two
            three

            > SELECT * FROM kafka_ssl_1
            one
            two
            three

            > SELECT * FROM kafka_scram_sha_512_1
            one
            two
            three

            > SELECT * FROM kafka_ssl_scram_sha_512_1
            one
            two
            three

            > SELECT * FROM kafka_sasl_1
            one
            two
            three

            > SELECT * FROM kafka_plaintext_2
            one
            two
            three

            > SELECT * FROM kafka_ssl_2
            one
            two
            three

            > SELECT * FROM kafka_scram_sha_512_2
            one
            two
            three

            > SELECT * FROM kafka_ssl_scram_sha_512_2
            one
            two
            three

            > SELECT * FROM kafka_sasl_2
            one
            two
            three

            > SELECT * FROM kafka_plaintext_3
            one
            two
            three

            > SELECT * FROM kafka_ssl_3
            one
            two
            three

            > SELECT * FROM kafka_scram_sha_512_3
            one
            two
            three

            > SELECT * FROM kafka_ssl_scram_sha_512_3
            one
            two
            three

            > SELECT * FROM kafka_sasl_3
            one
            two
            three
            """
        )
    )