Module materialize.checks.all_checks.webhook

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion


def schemas() -> str:
    return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)


class Webhook(Check):
    def _can_run(self, e: Executor) -> bool:
        return self.base_version >= MzVersion.parse_mz("v0.62.0-dev")

    def enable(self) -> str:
        if self.base_version < MzVersion.parse_mz("v0.76.0-dev"):
            return dedent(
                """
                $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
                ALTER SYSTEM SET enable_webhook_sources = true
                """
            )
        else:
            return ""

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas()
            + self.enable()
            + dedent(
                """
                > CREATE CLUSTER webhook_cluster REPLICAS (r1 (SIZE '1'));

                > CREATE SOURCE webhook_text IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;

                > CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS;

                > CREATE SOURCE webhook_bytes IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT BYTES;

                $ webhook-append database=materialize schema=public name=webhook_text
                fooä

                $ webhook-append database=materialize schema=public name=webhook_json content-type=application/json app=platform-checks1
                {
                  "hello": "wörld"
                }

                $ webhook-append database=materialize schema=public name=webhook_bytes
                \u0001
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas() + dedent(s))
            for s in [
                """
                $ webhook-append database=materialize schema=public name=webhook_text
                bar❤️

                $ webhook-append database=materialize schema=public name=webhook_json content-type=application/json app=
                {
                  "still": 123,
                  "foo": []
                }

                $ webhook-append database=materialize schema=public name=webhook_bytes
                \u0000\u0000\u0000\u0000
                """,
                """
                $ webhook-append database=materialize schema=public name=webhook_text
                baz123

                $ webhook-append database=materialize schema=public name=webhook_json content-type=application/json app=null
                [{"good": "bye"}, 42, null]

                $ webhook-append database=materialize schema=public name=webhook_bytes
                \u0001\u0002\u0003\u0004
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SHOW COLUMNS FROM webhook_text
                body false text

                > SHOW COLUMNS FROM webhook_json
                body false jsonb
                headers false map

                > SHOW COLUMNS FROM webhook_bytes
                body false bytea

                > SELECT * FROM webhook_text
                fooä
                bar❤️
                baz123

                > SELECT body FROM webhook_json WHERE headers -> 'app' = 'platform-checks1'
                "{\\"hello\\":\\"wörld\\"}"

                > SELECT body FROM webhook_json WHERE headers -> 'app' = ''
                "{\\"foo\\":[],\\"still\\":123}"

                > SELECT body FROM webhook_json WHERE headers -> 'app' = 'null'
                "[{\\"good\\":\\"bye\\"},42,null]"

                > SELECT * FROM webhook_bytes
                \\\\x00\\x00\\x00\\x00
                \\\\x01
                \\\\x01\\x02\\x03\\x04
           """
            )
        )

Functions

def schemas() ‑> str
Expand source code Browse git
def schemas() -> str:
    return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)

Classes

class Webhook (base_version: MzVersion, rng: random.Random | None)
Expand source code Browse git
class Webhook(Check):
    def _can_run(self, e: Executor) -> bool:
        return self.base_version >= MzVersion.parse_mz("v0.62.0-dev")

    def enable(self) -> str:
        if self.base_version < MzVersion.parse_mz("v0.76.0-dev"):
            return dedent(
                """
                $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
                ALTER SYSTEM SET enable_webhook_sources = true
                """
            )
        else:
            return ""

    def initialize(self) -> Testdrive:
        return Testdrive(
            schemas()
            + self.enable()
            + dedent(
                """
                > CREATE CLUSTER webhook_cluster REPLICAS (r1 (SIZE '1'));

                > CREATE SOURCE webhook_text IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;

                > CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS;

                > CREATE SOURCE webhook_bytes IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT BYTES;

                $ webhook-append database=materialize schema=public name=webhook_text
                fooä

                $ webhook-append database=materialize schema=public name=webhook_json content-type=application/json app=platform-checks1
                {
                  "hello": "wörld"
                }

                $ webhook-append database=materialize schema=public name=webhook_bytes
                \u0001
                """
            )
        )

    def manipulate(self) -> list[Testdrive]:
        return [
            Testdrive(schemas() + dedent(s))
            for s in [
                """
                $ webhook-append database=materialize schema=public name=webhook_text
                bar❤️

                $ webhook-append database=materialize schema=public name=webhook_json content-type=application/json app=
                {
                  "still": 123,
                  "foo": []
                }

                $ webhook-append database=materialize schema=public name=webhook_bytes
                \u0000\u0000\u0000\u0000
                """,
                """
                $ webhook-append database=materialize schema=public name=webhook_text
                baz123

                $ webhook-append database=materialize schema=public name=webhook_json content-type=application/json app=null
                [{"good": "bye"}, 42, null]

                $ webhook-append database=materialize schema=public name=webhook_bytes
                \u0001\u0002\u0003\u0004
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SHOW COLUMNS FROM webhook_text
                body false text

                > SHOW COLUMNS FROM webhook_json
                body false jsonb
                headers false map

                > SHOW COLUMNS FROM webhook_bytes
                body false bytea

                > SELECT * FROM webhook_text
                fooä
                bar❤️
                baz123

                > SELECT body FROM webhook_json WHERE headers -> 'app' = 'platform-checks1'
                "{\\"hello\\":\\"wörld\\"}"

                > SELECT body FROM webhook_json WHERE headers -> 'app' = ''
                "{\\"foo\\":[],\\"still\\":123}"

                > SELECT body FROM webhook_json WHERE headers -> 'app' = 'null'
                "[{\\"good\\":\\"bye\\"},42,null]"

                > SELECT * FROM webhook_bytes
                \\\\x00\\x00\\x00\\x00
                \\\\x01
                \\\\x01\\x02\\x03\\x04
           """
            )
        )

Ancestors

Methods

def enable(self) ‑> str
Expand source code Browse git
def enable(self) -> str:
    if self.base_version < MzVersion.parse_mz("v0.76.0-dev"):
        return dedent(
            """
            $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
            ALTER SYSTEM SET enable_webhook_sources = true
            """
        )
    else:
        return ""
def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        schemas()
        + self.enable()
        + dedent(
            """
            > CREATE CLUSTER webhook_cluster REPLICAS (r1 (SIZE '1'));

            > CREATE SOURCE webhook_text IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;

            > CREATE SOURCE webhook_json IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT JSON INCLUDE HEADERS;

            > CREATE SOURCE webhook_bytes IN CLUSTER webhook_cluster FROM WEBHOOK BODY FORMAT BYTES;

            $ webhook-append database=materialize schema=public name=webhook_text
            fooä

            $ webhook-append database=materialize schema=public name=webhook_json content-type=application/json app=platform-checks1
            {
              "hello": "wörld"
            }

            $ webhook-append database=materialize schema=public name=webhook_bytes
            \u0001
            """
        )
    )
def manipulate(self) ‑> list[Testdrive]
Expand source code Browse git
def manipulate(self) -> list[Testdrive]:
    return [
        Testdrive(schemas() + dedent(s))
        for s in [
            """
            $ webhook-append database=materialize schema=public name=webhook_text
            bar❤️

            $ webhook-append database=materialize schema=public name=webhook_json content-type=application/json app=
            {
              "still": 123,
              "foo": []
            }

            $ webhook-append database=materialize schema=public name=webhook_bytes
            \u0000\u0000\u0000\u0000
            """,
            """
            $ webhook-append database=materialize schema=public name=webhook_text
            baz123

            $ webhook-append database=materialize schema=public name=webhook_json content-type=application/json app=null
            [{"good": "bye"}, 42, null]

            $ webhook-append database=materialize schema=public name=webhook_bytes
            \u0001\u0002\u0003\u0004
            """,
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            > SHOW COLUMNS FROM webhook_text
            body false text

            > SHOW COLUMNS FROM webhook_json
            body false jsonb
            headers false map

            > SHOW COLUMNS FROM webhook_bytes
            body false bytea

            > SELECT * FROM webhook_text
            fooä
            bar❤️
            baz123

            > SELECT body FROM webhook_json WHERE headers -> 'app' = 'platform-checks1'
            "{\\"hello\\":\\"wörld\\"}"

            > SELECT body FROM webhook_json WHERE headers -> 'app' = ''
            "{\\"foo\\":[],\\"still\\":123}"

            > SELECT body FROM webhook_json WHERE headers -> 'app' = 'null'
            "[{\\"good\\":\\"bye\\"},42,null]"

            > SELECT * FROM webhook_bytes
            \\\\x00\\x00\\x00\\x00
            \\\\x01
            \\\\x01\\x02\\x03\\x04
       """
        )
    )