Module materialize.checks.pg_cdc

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

from textwrap import dedent
from typing import List

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check


class PgCdc(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > CREATE SECRET pgpass1 AS 'postgres';

                > CREATE CONNECTION pg1 FOR POSTGRES
                  HOST 'postgres',
                  DATABASE postgres,
                  USER postgres1,
                  PASSWORD SECRET pgpass1

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                CREATE USER postgres1 WITH SUPERUSER PASSWORD 'postgres';
                ALTER USER postgres1 WITH replication;
                DROP PUBLICATION IF EXISTS postgres_source;

                DROP TABLE IF EXISTS postgres_source_table;

                CREATE TABLE postgres_source_table (f1 TEXT, f2 INTEGER, f3 TEXT);
                ALTER TABLE postgres_source_table REPLICA IDENTITY FULL;

                INSERT INTO postgres_source_table SELECT 'A', 1, REPEAT('X', 1024) FROM generate_series(1,100);

                CREATE PUBLICATION postgres_source FOR ALL TABLES;
                """
            )
        )

    def manipulate(self) -> List[Testdrive]:
        return [
            Testdrive(dedent(s))
            for s in [
                """
                > CREATE SOURCE postgres_source1
                  FROM POSTGRES CONNECTION pg1
                  (PUBLICATION 'postgres_source')
                  FOR TABLES (postgres_source_table AS postgres_source_tableA);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'B', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;

                > CREATE SECRET pgpass2 AS 'postgres';

                > CREATE CONNECTION pg2 FOR POSTGRES
                  HOST 'postgres',
                  DATABASE postgres,
                  USER postgres1,
                  PASSWORD SECRET pgpass1

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'C', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;
                """,
                """

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'D', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;

                > CREATE SOURCE postgres_source2
                  FROM POSTGRES CONNECTION pg2
                  (PUBLICATION 'postgres_source')
                  FOR TABLES (postgres_source_table AS postgres_source_tableB);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'E', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'F', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;

                > CREATE SECRET pgpass3 AS 'postgres';

                > CREATE CONNECTION pg3 FOR POSTGRES
                  HOST 'postgres',
                  DATABASE postgres,
                  USER postgres1,
                  PASSWORD SECRET pgpass3

                > CREATE SOURCE postgres_source3
                  FROM POSTGRES CONNECTION pg3
                  (PUBLICATION 'postgres_source')
                  FOR TABLES (postgres_source_table AS postgres_source_tableC);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'G', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;


                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'H', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SELECT f1, f2, SUM(LENGTH(f3)) FROM postgres_source_tableA GROUP BY f1, f2;
                A 8 102400
                B 8 102400
                C 7 102400
                D 6 102400
                E 5 102400
                F 4 102400
                G 3 102400
                H 2 102400

                > SELECT f1, f2, SUM(LENGTH(f3)) FROM postgres_source_tableB GROUP BY f1, f2;
                A 8 102400
                B 8 102400
                C 7 102400
                D 6 102400
                E 5 102400
                F 4 102400
                G 3 102400
                H 2 102400

                > SELECT f1, f2, SUM(LENGTH(f3)) FROM postgres_source_tableC GROUP BY f1, f2;
                A 8 102400
                B 8 102400
                C 7 102400
                D 6 102400
                E 5 102400
                F 4 102400
                G 3 102400
                H 2 102400
           """
            )
        )


class PgCdcMzNow(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > CREATE SECRET postgres_mz_now_pass AS 'postgres';

                > CREATE CONNECTION postgres_mz_now_conn FOR POSTGRES
                  HOST 'postgres',
                  DATABASE postgres,
                  USER postgres2,
                  PASSWORD SECRET postgres_mz_now_pass

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                CREATE USER postgres2 WITH SUPERUSER PASSWORD 'postgres';
                ALTER USER postgres2 WITH replication;
                DROP PUBLICATION IF EXISTS postgres_mz_now_publication;

                DROP TABLE IF EXISTS postgres_mz_now_table;

                CREATE TABLE postgres_mz_now_table (f1 TIMESTAMP, f2 CHAR(5), PRIMARY KEY (f1, f2));
                ALTER TABLE postgres_mz_now_table REPLICA IDENTITY FULL;

                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A1');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B1');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C1');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D1');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E1');

                CREATE PUBLICATION postgres_mz_now_publication FOR ALL TABLES;

                > CREATE SOURCE postgres_mz_now_source
                  FROM POSTGRES CONNECTION postgres_mz_now_conn
                  (PUBLICATION 'postgres_mz_now_publication')
                  FOR ALL TABLES;

                # Return all rows fresher than 60 seconds
                > CREATE MATERIALIZED VIEW postgres_mz_now_view AS
                  SELECT * FROM postgres_mz_now_table
                  WHERE mz_now() <= ROUND(EXTRACT(epoch FROM f1 + INTERVAL '60' SECOND) * 1000)
                """
            )
        )

    def manipulate(self) -> List[Testdrive]:
        return [
            Testdrive(dedent(s))
            for s in [
                """
                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A2');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B2');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C2');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D2');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E2');
                DELETE FROM postgres_mz_now_table WHERE f2 = 'B1';
                UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'C1';
                """,
                """
                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A3');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C3');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D3');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E3');
                DELETE FROM postgres_mz_now_table WHERE f2 = 'B2';
                UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'D1';
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SELECT COUNT(*) FROM postgres_mz_now_table;
                13

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A4');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B4');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C4');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D4');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E4');
                DELETE FROM postgres_mz_now_table WHERE f2 = 'B3';
                UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'E1'

                # Expect some rows newer than 60 seconds in view
                > SELECT COUNT(*) >= 6 FROM postgres_mz_now_view
                  WHERE f1 > NOW() - INTERVAL '60' SECOND;
                true

                # Expect no rows older than 60 seconds in view
                > SELECT COUNT(*) FROM postgres_mz_now_view
                  WHERE f1 < NOW() - INTERVAL '60' SECOND;
                0

                # Rollback the last INSERTs so that validate() can be called multiple times
                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3');
                DELETE FROM postgres_mz_now_table WHERE f2 LIKE '%4%';
                """
            )
        )

Classes

class PgCdc
Expand source code Browse git
class PgCdc(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > CREATE SECRET pgpass1 AS 'postgres';

                > CREATE CONNECTION pg1 FOR POSTGRES
                  HOST 'postgres',
                  DATABASE postgres,
                  USER postgres1,
                  PASSWORD SECRET pgpass1

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                CREATE USER postgres1 WITH SUPERUSER PASSWORD 'postgres';
                ALTER USER postgres1 WITH replication;
                DROP PUBLICATION IF EXISTS postgres_source;

                DROP TABLE IF EXISTS postgres_source_table;

                CREATE TABLE postgres_source_table (f1 TEXT, f2 INTEGER, f3 TEXT);
                ALTER TABLE postgres_source_table REPLICA IDENTITY FULL;

                INSERT INTO postgres_source_table SELECT 'A', 1, REPEAT('X', 1024) FROM generate_series(1,100);

                CREATE PUBLICATION postgres_source FOR ALL TABLES;
                """
            )
        )

    def manipulate(self) -> List[Testdrive]:
        return [
            Testdrive(dedent(s))
            for s in [
                """
                > CREATE SOURCE postgres_source1
                  FROM POSTGRES CONNECTION pg1
                  (PUBLICATION 'postgres_source')
                  FOR TABLES (postgres_source_table AS postgres_source_tableA);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'B', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;

                > CREATE SECRET pgpass2 AS 'postgres';

                > CREATE CONNECTION pg2 FOR POSTGRES
                  HOST 'postgres',
                  DATABASE postgres,
                  USER postgres1,
                  PASSWORD SECRET pgpass1

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'C', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;
                """,
                """

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'D', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;

                > CREATE SOURCE postgres_source2
                  FROM POSTGRES CONNECTION pg2
                  (PUBLICATION 'postgres_source')
                  FOR TABLES (postgres_source_table AS postgres_source_tableB);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'E', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'F', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;

                > CREATE SECRET pgpass3 AS 'postgres';

                > CREATE CONNECTION pg3 FOR POSTGRES
                  HOST 'postgres',
                  DATABASE postgres,
                  USER postgres1,
                  PASSWORD SECRET pgpass3

                > CREATE SOURCE postgres_source3
                  FROM POSTGRES CONNECTION pg3
                  (PUBLICATION 'postgres_source')
                  FOR TABLES (postgres_source_table AS postgres_source_tableC);

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'G', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;


                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_source_table SELECT 'H', 1, REPEAT('X', 1024) FROM generate_series(1,100);
                UPDATE postgres_source_table SET f2 = f2 + 1;
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SELECT f1, f2, SUM(LENGTH(f3)) FROM postgres_source_tableA GROUP BY f1, f2;
                A 8 102400
                B 8 102400
                C 7 102400
                D 6 102400
                E 5 102400
                F 4 102400
                G 3 102400
                H 2 102400

                > SELECT f1, f2, SUM(LENGTH(f3)) FROM postgres_source_tableB GROUP BY f1, f2;
                A 8 102400
                B 8 102400
                C 7 102400
                D 6 102400
                E 5 102400
                F 4 102400
                G 3 102400
                H 2 102400

                > SELECT f1, f2, SUM(LENGTH(f3)) FROM postgres_source_tableC GROUP BY f1, f2;
                A 8 102400
                B 8 102400
                C 7 102400
                D 6 102400
                E 5 102400
                F 4 102400
                G 3 102400
                H 2 102400
           """
            )
        )

Ancestors

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            > CREATE SECRET pgpass1 AS 'postgres';

            > CREATE CONNECTION pg1 FOR POSTGRES
              HOST 'postgres',
              DATABASE postgres,
              USER postgres1,
              PASSWORD SECRET pgpass1

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            CREATE USER postgres1 WITH SUPERUSER PASSWORD 'postgres';
            ALTER USER postgres1 WITH replication;
            DROP PUBLICATION IF EXISTS postgres_source;

            DROP TABLE IF EXISTS postgres_source_table;

            CREATE TABLE postgres_source_table (f1 TEXT, f2 INTEGER, f3 TEXT);
            ALTER TABLE postgres_source_table REPLICA IDENTITY FULL;

            INSERT INTO postgres_source_table SELECT 'A', 1, REPEAT('X', 1024) FROM generate_series(1,100);

            CREATE PUBLICATION postgres_source FOR ALL TABLES;
            """
        )
    )
def manipulate(self) ‑> List[Testdrive]
Expand source code Browse git
def manipulate(self) -> List[Testdrive]:
    return [
        Testdrive(dedent(s))
        for s in [
            """
            > CREATE SOURCE postgres_source1
              FROM POSTGRES CONNECTION pg1
              (PUBLICATION 'postgres_source')
              FOR TABLES (postgres_source_table AS postgres_source_tableA);

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO postgres_source_table SELECT 'B', 1, REPEAT('X', 1024) FROM generate_series(1,100);
            UPDATE postgres_source_table SET f2 = f2 + 1;

            > CREATE SECRET pgpass2 AS 'postgres';

            > CREATE CONNECTION pg2 FOR POSTGRES
              HOST 'postgres',
              DATABASE postgres,
              USER postgres1,
              PASSWORD SECRET pgpass1

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO postgres_source_table SELECT 'C', 1, REPEAT('X', 1024) FROM generate_series(1,100);
            UPDATE postgres_source_table SET f2 = f2 + 1;
            """,
            """

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO postgres_source_table SELECT 'D', 1, REPEAT('X', 1024) FROM generate_series(1,100);
            UPDATE postgres_source_table SET f2 = f2 + 1;

            > CREATE SOURCE postgres_source2
              FROM POSTGRES CONNECTION pg2
              (PUBLICATION 'postgres_source')
              FOR TABLES (postgres_source_table AS postgres_source_tableB);

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO postgres_source_table SELECT 'E', 1, REPEAT('X', 1024) FROM generate_series(1,100);
            UPDATE postgres_source_table SET f2 = f2 + 1;

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO postgres_source_table SELECT 'F', 1, REPEAT('X', 1024) FROM generate_series(1,100);
            UPDATE postgres_source_table SET f2 = f2 + 1;

            > CREATE SECRET pgpass3 AS 'postgres';

            > CREATE CONNECTION pg3 FOR POSTGRES
              HOST 'postgres',
              DATABASE postgres,
              USER postgres1,
              PASSWORD SECRET pgpass3

            > CREATE SOURCE postgres_source3
              FROM POSTGRES CONNECTION pg3
              (PUBLICATION 'postgres_source')
              FOR TABLES (postgres_source_table AS postgres_source_tableC);

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO postgres_source_table SELECT 'G', 1, REPEAT('X', 1024) FROM generate_series(1,100);
            UPDATE postgres_source_table SET f2 = f2 + 1;


            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO postgres_source_table SELECT 'H', 1, REPEAT('X', 1024) FROM generate_series(1,100);
            UPDATE postgres_source_table SET f2 = f2 + 1;
            """,
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            > SELECT f1, f2, SUM(LENGTH(f3)) FROM postgres_source_tableA GROUP BY f1, f2;
            A 8 102400
            B 8 102400
            C 7 102400
            D 6 102400
            E 5 102400
            F 4 102400
            G 3 102400
            H 2 102400

            > SELECT f1, f2, SUM(LENGTH(f3)) FROM postgres_source_tableB GROUP BY f1, f2;
            A 8 102400
            B 8 102400
            C 7 102400
            D 6 102400
            E 5 102400
            F 4 102400
            G 3 102400
            H 2 102400

            > SELECT f1, f2, SUM(LENGTH(f3)) FROM postgres_source_tableC GROUP BY f1, f2;
            A 8 102400
            B 8 102400
            C 7 102400
            D 6 102400
            E 5 102400
            F 4 102400
            G 3 102400
            H 2 102400
       """
        )
    )
class PgCdcMzNow
Expand source code Browse git
class PgCdcMzNow(Check):
    def initialize(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > CREATE SECRET postgres_mz_now_pass AS 'postgres';

                > CREATE CONNECTION postgres_mz_now_conn FOR POSTGRES
                  HOST 'postgres',
                  DATABASE postgres,
                  USER postgres2,
                  PASSWORD SECRET postgres_mz_now_pass

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                CREATE USER postgres2 WITH SUPERUSER PASSWORD 'postgres';
                ALTER USER postgres2 WITH replication;
                DROP PUBLICATION IF EXISTS postgres_mz_now_publication;

                DROP TABLE IF EXISTS postgres_mz_now_table;

                CREATE TABLE postgres_mz_now_table (f1 TIMESTAMP, f2 CHAR(5), PRIMARY KEY (f1, f2));
                ALTER TABLE postgres_mz_now_table REPLICA IDENTITY FULL;

                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A1');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B1');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C1');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D1');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E1');

                CREATE PUBLICATION postgres_mz_now_publication FOR ALL TABLES;

                > CREATE SOURCE postgres_mz_now_source
                  FROM POSTGRES CONNECTION postgres_mz_now_conn
                  (PUBLICATION 'postgres_mz_now_publication')
                  FOR ALL TABLES;

                # Return all rows fresher than 60 seconds
                > CREATE MATERIALIZED VIEW postgres_mz_now_view AS
                  SELECT * FROM postgres_mz_now_table
                  WHERE mz_now() <= ROUND(EXTRACT(epoch FROM f1 + INTERVAL '60' SECOND) * 1000)
                """
            )
        )

    def manipulate(self) -> List[Testdrive]:
        return [
            Testdrive(dedent(s))
            for s in [
                """
                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A2');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B2');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C2');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D2');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E2');
                DELETE FROM postgres_mz_now_table WHERE f2 = 'B1';
                UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'C1';
                """,
                """
                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A3');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C3');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D3');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E3');
                DELETE FROM postgres_mz_now_table WHERE f2 = 'B2';
                UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'D1';
                """,
            ]
        ]

    def validate(self) -> Testdrive:
        return Testdrive(
            dedent(
                """
                > SELECT COUNT(*) FROM postgres_mz_now_table;
                13

                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A4');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B4');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C4');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D4');
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E4');
                DELETE FROM postgres_mz_now_table WHERE f2 = 'B3';
                UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'E1'

                # Expect some rows newer than 60 seconds in view
                > SELECT COUNT(*) >= 6 FROM postgres_mz_now_view
                  WHERE f1 > NOW() - INTERVAL '60' SECOND;
                true

                # Expect no rows older than 60 seconds in view
                > SELECT COUNT(*) FROM postgres_mz_now_view
                  WHERE f1 < NOW() - INTERVAL '60' SECOND;
                0

                # Rollback the last INSERTs so that validate() can be called multiple times
                $ postgres-execute connection=postgres://postgres:postgres@postgres
                INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3');
                DELETE FROM postgres_mz_now_table WHERE f2 LIKE '%4%';
                """
            )
        )

Ancestors

Methods

def initialize(self) ‑> Testdrive
Expand source code Browse git
def initialize(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            > CREATE SECRET postgres_mz_now_pass AS 'postgres';

            > CREATE CONNECTION postgres_mz_now_conn FOR POSTGRES
              HOST 'postgres',
              DATABASE postgres,
              USER postgres2,
              PASSWORD SECRET postgres_mz_now_pass

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            CREATE USER postgres2 WITH SUPERUSER PASSWORD 'postgres';
            ALTER USER postgres2 WITH replication;
            DROP PUBLICATION IF EXISTS postgres_mz_now_publication;

            DROP TABLE IF EXISTS postgres_mz_now_table;

            CREATE TABLE postgres_mz_now_table (f1 TIMESTAMP, f2 CHAR(5), PRIMARY KEY (f1, f2));
            ALTER TABLE postgres_mz_now_table REPLICA IDENTITY FULL;

            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A1');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B1');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C1');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D1');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E1');

            CREATE PUBLICATION postgres_mz_now_publication FOR ALL TABLES;

            > CREATE SOURCE postgres_mz_now_source
              FROM POSTGRES CONNECTION postgres_mz_now_conn
              (PUBLICATION 'postgres_mz_now_publication')
              FOR ALL TABLES;

            # Return all rows fresher than 60 seconds
            > CREATE MATERIALIZED VIEW postgres_mz_now_view AS
              SELECT * FROM postgres_mz_now_table
              WHERE mz_now() <= ROUND(EXTRACT(epoch FROM f1 + INTERVAL '60' SECOND) * 1000)
            """
        )
    )
def manipulate(self) ‑> List[Testdrive]
Expand source code Browse git
def manipulate(self) -> List[Testdrive]:
    return [
        Testdrive(dedent(s))
        for s in [
            """
            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A2');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B2');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C2');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D2');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E2');
            DELETE FROM postgres_mz_now_table WHERE f2 = 'B1';
            UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'C1';
            """,
            """
            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A3');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C3');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D3');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E3');
            DELETE FROM postgres_mz_now_table WHERE f2 = 'B2';
            UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'D1';
            """,
        ]
    ]
def validate(self) ‑> Testdrive
Expand source code Browse git
def validate(self) -> Testdrive:
    return Testdrive(
        dedent(
            """
            > SELECT COUNT(*) FROM postgres_mz_now_table;
            13

            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A4');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B4');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C4');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D4');
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E4');
            DELETE FROM postgres_mz_now_table WHERE f2 = 'B3';
            UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'E1'

            # Expect some rows newer than 60 seconds in view
            > SELECT COUNT(*) >= 6 FROM postgres_mz_now_view
              WHERE f1 > NOW() - INTERVAL '60' SECOND;
            true

            # Expect no rows older than 60 seconds in view
            > SELECT COUNT(*) FROM postgres_mz_now_view
              WHERE f1 < NOW() - INTERVAL '60' SECOND;
            0

            # Rollback the last INSERTs so that validate() can be called multiple times
            $ postgres-execute connection=postgres://postgres:postgres@postgres
            INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3');
            DELETE FROM postgres_mz_now_table WHERE f2 LIKE '%4%';
            """
        )
    )