Module materialize.checks.all_checks.pg_cdc
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import re
from random import Random
from textwrap import dedent
from typing import Any
from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
from materialize.mz_version import MzVersion
class PgCdcBase:
base_version: MzVersion
current_version: MzVersion
wait: bool
suffix: str
repeats: int
expects: int
def __init__(self, wait: bool, **kwargs: Any) -> None:
self.wait = wait
self.repeats = 1024 if wait else 16384
self.expects = 97350 if wait else 1633350
self.suffix = f"_{str(wait).lower()}"
super().__init__(**kwargs) # foward unused args to Check
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
f"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
CREATE USER postgres1{self.suffix} WITH SUPERUSER PASSWORD 'postgres';
ALTER USER postgres1{self.suffix} WITH replication;
DROP PUBLICATION IF EXISTS postgres_source{self.suffix};
DROP TABLE IF EXISTS postgres_source_table{self.suffix};
CREATE TABLE postgres_source_table{self.suffix} (f1 TEXT, f2 INTEGER, f3 TEXT UNIQUE NOT NULL, PRIMARY KEY(f1, f2));
ALTER TABLE postgres_source_table{self.suffix} REPLICA IDENTITY FULL;
INSERT INTO postgres_source_table{self.suffix} SELECT 'A', i, REPEAT('A', {self.repeats} - i) FROM generate_series(1,100) AS i;
CREATE PUBLICATION postgres_source{self.suffix} FOR ALL TABLES;
> CREATE SECRET pgpass1{self.suffix} AS 'postgres';
> CREATE CONNECTION pg1{self.suffix} FOR POSTGRES
HOST 'postgres',
DATABASE postgres,
USER postgres1{self.suffix},
PASSWORD SECRET pgpass1{self.suffix}
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(s))
for s in [
f"""
> CREATE SOURCE postgres_source1{self.suffix}
FROM POSTGRES CONNECTION pg1{self.suffix}
(PUBLICATION 'postgres_source{self.suffix}')
FOR TABLES (postgres_source_table{self.suffix} AS postgres_source_tableA{self.suffix});
> CREATE DEFAULT INDEX ON postgres_source_tableA{self.suffix};
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO postgres_source_table{self.suffix} SELECT 'B', i, REPEAT('B', {self.repeats} - i) FROM generate_series(1,100) AS i;
UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
> CREATE SECRET pgpass2{self.suffix} AS 'postgres';
> CREATE CONNECTION pg2{self.suffix} FOR POSTGRES
HOST 'postgres',
DATABASE postgres,
USER postgres1{self.suffix},
PASSWORD SECRET pgpass1{self.suffix}
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO postgres_source_table{self.suffix} SELECT 'C', i, REPEAT('C', {self.repeats} - i) FROM generate_series(1,100) AS i;
UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
$[version>=5200] postgres-execute connection=postgres://mz_system@${{testdrive.materialize-internal-sql-addr}}
GRANT USAGE ON CONNECTION pg2{self.suffix} TO materialize
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO postgres_source_table{self.suffix} SELECT 'D', i, REPEAT('D', {self.repeats} - i) FROM generate_series(1,100) AS i;
UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
> CREATE SOURCE postgres_source2{self.suffix}
FROM POSTGRES CONNECTION pg2{self.suffix}
(PUBLICATION 'postgres_source{self.suffix}')
FOR TABLES (postgres_source_table{self.suffix} AS postgres_source_tableB{self.suffix});
# Create a view with a complex dependency structure
> CREATE VIEW IF NOT EXISTS table_a_b_count_sum AS SELECT SUM(total_count) AS total_rows FROM (
SELECT COUNT(*) AS total_count FROM postgres_source_tableA{self.suffix}
UNION ALL
SELECT COUNT(*) AS total_count FROM postgres_source_tableB{self.suffix}
);
"""
+ (
f"""
# Wait until Pg snapshot is complete in order to avoid #18940
> SELECT COUNT(*) > 0 FROM postgres_source_tableA{self.suffix}
true
# Wait until Pg snapshot is complete in order to avoid #18940
> SELECT COUNT(*) > 0 FROM postgres_source_tableB{self.suffix}
true
"""
if self.wait
else ""
),
f"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO postgres_source_table{self.suffix} SELECT 'E', i, REPEAT('E', {self.repeats} - i) FROM generate_series(1,100) AS i;
UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO postgres_source_table{self.suffix} SELECT 'F', i, REPEAT('F', {self.repeats} - i) FROM generate_series(1,100) AS i;
UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
> CREATE SECRET pgpass3{self.suffix} AS 'postgres';
> CREATE CONNECTION pg3{self.suffix} FOR POSTGRES
HOST 'postgres',
DATABASE postgres,
USER postgres1{self.suffix},
PASSWORD SECRET pgpass3{self.suffix}
> CREATE SOURCE postgres_source3{self.suffix}
FROM POSTGRES CONNECTION pg3{self.suffix}
(PUBLICATION 'postgres_source{self.suffix}')
FOR TABLES (postgres_source_table{self.suffix} AS postgres_source_tableC{self.suffix});
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO postgres_source_table{self.suffix} SELECT 'G', i, REPEAT('G', {self.repeats} - i) FROM generate_series(1,100) AS i;
UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO postgres_source_table{self.suffix} SELECT 'H', i, REPEAT('X', {self.repeats} - i) FROM generate_series(1,100) AS i;
UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100;
"""
+ (
f"""
# Wait until Pg snapshot is complete in order to avoid #18940
> SELECT COUNT(*) > 0 FROM postgres_source_tableB{self.suffix}
true
> SELECT COUNT(*) > 0 FROM postgres_source_tableC{self.suffix}
true
"""
if self.wait
else ""
),
]
]
def validate(self) -> Testdrive:
sql = dedent(
f"""
$ postgres-execute connection=postgres://mz_system@${{testdrive.materialize-internal-sql-addr}}
GRANT SELECT ON postgres_source_tableA{self.suffix} TO materialize
GRANT SELECT ON postgres_source_tableB{self.suffix} TO materialize
GRANT SELECT ON postgres_source_tableC{self.suffix} TO materialize
# Can take longer after a restart
$ set-sql-timeout duration=600s
> SELECT f1, max(f2), SUM(LENGTH(f3)) FROM postgres_source_tableA{self.suffix} GROUP BY f1;
A 800 {self.expects}
B 800 {self.expects}
C 700 {self.expects}
D 600 {self.expects}
E 500 {self.expects}
F 400 {self.expects}
G 300 {self.expects}
H 200 {self.expects}
> SELECT f1, max(f2), SUM(LENGTH(f3)) FROM postgres_source_tableB{self.suffix} GROUP BY f1;
A 800 {self.expects}
B 800 {self.expects}
C 700 {self.expects}
D 600 {self.expects}
E 500 {self.expects}
F 400 {self.expects}
G 300 {self.expects}
H 200 {self.expects}
> SELECT f1, max(f2), SUM(LENGTH(f3)) FROM postgres_source_tableC{self.suffix} GROUP BY f1;
A 800 {self.expects}
B 800 {self.expects}
C 700 {self.expects}
D 600 {self.expects}
E 500 {self.expects}
F 400 {self.expects}
G 300 {self.expects}
H 200 {self.expects}
> SELECT total_rows FROM table_a_b_count_sum;
1600
"""
)
if self.base_version >= MzVersion.parse_mz("v0.50.0-dev"):
sql += dedent(
f"""
# Confirm that the primary key information has been propagated from Pg
> SELECT key FROM (SHOW INDEXES ON postgres_source_tableA{self.suffix});
{{f1,f2}}
? EXPLAIN SELECT DISTINCT f1, f2 FROM postgres_source_tableA{self.suffix};
Explained Query (fast path):
Project (#0, #1)
ReadIndex on=materialize.public.postgres_source_tablea{self.suffix} postgres_source_tablea{self.suffix}_primary_idx=[*** full scan ***]
Used Indexes:
- materialize.public.postgres_source_tablea{self.suffix}_primary_idx (*** full scan ***)
Target cluster: quickstart
"""
)
if self.current_version < MzVersion.parse_mz("v0.96.0-dev"):
sql = remove_target_cluster_from_explain(sql)
return Testdrive(sql)
@externally_idempotent(False)
class PgCdc(PgCdcBase, Check):
def __init__(self, base_version: MzVersion, rng: Random | None) -> None:
super().__init__(wait=True, base_version=base_version, rng=rng)
@externally_idempotent(False)
class PgCdcNoWait(PgCdcBase, Check):
def __init__(self, base_version: MzVersion, rng: Random | None) -> None:
super().__init__(wait=False, base_version=base_version, rng=rng)
@externally_idempotent(False)
class PgCdcMzNow(Check):
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
CREATE USER postgres2 WITH SUPERUSER PASSWORD 'postgres';
ALTER USER postgres2 WITH replication;
DROP PUBLICATION IF EXISTS postgres_mz_now_publication;
DROP TABLE IF EXISTS postgres_mz_now_table;
CREATE TABLE postgres_mz_now_table (f1 TIMESTAMP, f2 CHAR(5), PRIMARY KEY (f1, f2));
ALTER TABLE postgres_mz_now_table REPLICA IDENTITY FULL;
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A1');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B1');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C1');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D1');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E1');
CREATE PUBLICATION postgres_mz_now_publication FOR ALL TABLES;
> CREATE SECRET postgres_mz_now_pass AS 'postgres';
> CREATE CONNECTION postgres_mz_now_conn FOR POSTGRES
HOST 'postgres',
DATABASE postgres,
USER postgres2,
PASSWORD SECRET postgres_mz_now_pass
> CREATE SOURCE postgres_mz_now_source
FROM POSTGRES CONNECTION postgres_mz_now_conn
(PUBLICATION 'postgres_mz_now_publication')
FOR TABLES (postgres_mz_now_table);
# Return all rows fresher than 60 seconds
> CREATE MATERIALIZED VIEW postgres_mz_now_view AS
SELECT * FROM postgres_mz_now_table
WHERE mz_now() <= ROUND(EXTRACT(epoch FROM f1 + INTERVAL '60' SECOND) * 1000)
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(s))
for s in [
"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A2');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B2');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C2');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D2');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E2');
DELETE FROM postgres_mz_now_table WHERE f2 = 'B1';
UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'C1';
""",
"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A3');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C3');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D3');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E3');
DELETE FROM postgres_mz_now_table WHERE f2 = 'B2';
UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'D1';
""",
]
]
def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
> SELECT COUNT(*) FROM postgres_mz_now_table;
13
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A4');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B4');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C4');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D4');
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E4');
DELETE FROM postgres_mz_now_table WHERE f2 = 'B3';
UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'E1'
# Expect some rows newer than 60 seconds in view
> SELECT COUNT(*) >= 6 FROM postgres_mz_now_view
WHERE f1 > NOW() - INTERVAL '60' SECOND;
true
# Expect no rows older than 60 seconds in view
> SELECT COUNT(*) FROM postgres_mz_now_view
WHERE f1 < NOW() - INTERVAL '60' SECOND;
0
# Rollback the last INSERTs so that validate() can be called multiple times
$ postgres-execute connection=postgres://postgres:postgres@postgres
INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3');
DELETE FROM postgres_mz_now_table WHERE f2 LIKE '%4%';
"""
)
)
def remove_target_cluster_from_explain(sql: str) -> str:
return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
Functions
def remove_target_cluster_from_explain(sql: str) ‑> str
-
Expand source code Browse git
def remove_target_cluster_from_explain(sql: str) -> str: return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
Classes
class PgCdc (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
@externally_idempotent(False) class PgCdc(PgCdcBase, Check): def __init__(self, base_version: MzVersion, rng: Random | None) -> None: super().__init__(wait=True, base_version=base_version, rng=rng)
Ancestors
Class variables
var externally_idempotent : bool
class PgCdcBase (wait: bool, **kwargs: Any)
-
Expand source code Browse git
class PgCdcBase: base_version: MzVersion current_version: MzVersion wait: bool suffix: str repeats: int expects: int def __init__(self, wait: bool, **kwargs: Any) -> None: self.wait = wait self.repeats = 1024 if wait else 16384 self.expects = 97350 if wait else 1633350 self.suffix = f"_{str(wait).lower()}" super().__init__(**kwargs) # foward unused args to Check def initialize(self) -> Testdrive: return Testdrive( dedent( f""" $ postgres-execute connection=postgres://postgres:postgres@postgres CREATE USER postgres1{self.suffix} WITH SUPERUSER PASSWORD 'postgres'; ALTER USER postgres1{self.suffix} WITH replication; DROP PUBLICATION IF EXISTS postgres_source{self.suffix}; DROP TABLE IF EXISTS postgres_source_table{self.suffix}; CREATE TABLE postgres_source_table{self.suffix} (f1 TEXT, f2 INTEGER, f3 TEXT UNIQUE NOT NULL, PRIMARY KEY(f1, f2)); ALTER TABLE postgres_source_table{self.suffix} REPLICA IDENTITY FULL; INSERT INTO postgres_source_table{self.suffix} SELECT 'A', i, REPEAT('A', {self.repeats} - i) FROM generate_series(1,100) AS i; CREATE PUBLICATION postgres_source{self.suffix} FOR ALL TABLES; > CREATE SECRET pgpass1{self.suffix} AS 'postgres'; > CREATE CONNECTION pg1{self.suffix} FOR POSTGRES HOST 'postgres', DATABASE postgres, USER postgres1{self.suffix}, PASSWORD SECRET pgpass1{self.suffix} """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(s)) for s in [ f""" > CREATE SOURCE postgres_source1{self.suffix} FROM POSTGRES CONNECTION pg1{self.suffix} (PUBLICATION 'postgres_source{self.suffix}') FOR TABLES (postgres_source_table{self.suffix} AS postgres_source_tableA{self.suffix}); > CREATE DEFAULT INDEX ON postgres_source_tableA{self.suffix}; $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'B', i, REPEAT('B', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; > CREATE SECRET pgpass2{self.suffix} AS 'postgres'; > CREATE CONNECTION pg2{self.suffix} FOR POSTGRES HOST 'postgres', DATABASE postgres, USER postgres1{self.suffix}, PASSWORD SECRET pgpass1{self.suffix} $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'C', i, REPEAT('C', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; $[version>=5200] postgres-execute connection=postgres://mz_system@${{testdrive.materialize-internal-sql-addr}} GRANT USAGE ON CONNECTION pg2{self.suffix} TO materialize $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'D', i, REPEAT('D', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; > CREATE SOURCE postgres_source2{self.suffix} FROM POSTGRES CONNECTION pg2{self.suffix} (PUBLICATION 'postgres_source{self.suffix}') FOR TABLES (postgres_source_table{self.suffix} AS postgres_source_tableB{self.suffix}); # Create a view with a complex dependency structure > CREATE VIEW IF NOT EXISTS table_a_b_count_sum AS SELECT SUM(total_count) AS total_rows FROM ( SELECT COUNT(*) AS total_count FROM postgres_source_tableA{self.suffix} UNION ALL SELECT COUNT(*) AS total_count FROM postgres_source_tableB{self.suffix} ); """ + ( f""" # Wait until Pg snapshot is complete in order to avoid #18940 > SELECT COUNT(*) > 0 FROM postgres_source_tableA{self.suffix} true # Wait until Pg snapshot is complete in order to avoid #18940 > SELECT COUNT(*) > 0 FROM postgres_source_tableB{self.suffix} true """ if self.wait else "" ), f""" $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'E', i, REPEAT('E', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'F', i, REPEAT('F', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; > CREATE SECRET pgpass3{self.suffix} AS 'postgres'; > CREATE CONNECTION pg3{self.suffix} FOR POSTGRES HOST 'postgres', DATABASE postgres, USER postgres1{self.suffix}, PASSWORD SECRET pgpass3{self.suffix} > CREATE SOURCE postgres_source3{self.suffix} FROM POSTGRES CONNECTION pg3{self.suffix} (PUBLICATION 'postgres_source{self.suffix}') FOR TABLES (postgres_source_table{self.suffix} AS postgres_source_tableC{self.suffix}); $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'G', i, REPEAT('G', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'H', i, REPEAT('X', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; """ + ( f""" # Wait until Pg snapshot is complete in order to avoid #18940 > SELECT COUNT(*) > 0 FROM postgres_source_tableB{self.suffix} true > SELECT COUNT(*) > 0 FROM postgres_source_tableC{self.suffix} true """ if self.wait else "" ), ] ] def validate(self) -> Testdrive: sql = dedent( f""" $ postgres-execute connection=postgres://mz_system@${{testdrive.materialize-internal-sql-addr}} GRANT SELECT ON postgres_source_tableA{self.suffix} TO materialize GRANT SELECT ON postgres_source_tableB{self.suffix} TO materialize GRANT SELECT ON postgres_source_tableC{self.suffix} TO materialize # Can take longer after a restart $ set-sql-timeout duration=600s > SELECT f1, max(f2), SUM(LENGTH(f3)) FROM postgres_source_tableA{self.suffix} GROUP BY f1; A 800 {self.expects} B 800 {self.expects} C 700 {self.expects} D 600 {self.expects} E 500 {self.expects} F 400 {self.expects} G 300 {self.expects} H 200 {self.expects} > SELECT f1, max(f2), SUM(LENGTH(f3)) FROM postgres_source_tableB{self.suffix} GROUP BY f1; A 800 {self.expects} B 800 {self.expects} C 700 {self.expects} D 600 {self.expects} E 500 {self.expects} F 400 {self.expects} G 300 {self.expects} H 200 {self.expects} > SELECT f1, max(f2), SUM(LENGTH(f3)) FROM postgres_source_tableC{self.suffix} GROUP BY f1; A 800 {self.expects} B 800 {self.expects} C 700 {self.expects} D 600 {self.expects} E 500 {self.expects} F 400 {self.expects} G 300 {self.expects} H 200 {self.expects} > SELECT total_rows FROM table_a_b_count_sum; 1600 """ ) if self.base_version >= MzVersion.parse_mz("v0.50.0-dev"): sql += dedent( f""" # Confirm that the primary key information has been propagated from Pg > SELECT key FROM (SHOW INDEXES ON postgres_source_tableA{self.suffix}); {{f1,f2}} ? EXPLAIN SELECT DISTINCT f1, f2 FROM postgres_source_tableA{self.suffix}; Explained Query (fast path): Project (#0, #1) ReadIndex on=materialize.public.postgres_source_tablea{self.suffix} postgres_source_tablea{self.suffix}_primary_idx=[*** full scan ***] Used Indexes: - materialize.public.postgres_source_tablea{self.suffix}_primary_idx (*** full scan ***) Target cluster: quickstart """ ) if self.current_version < MzVersion.parse_mz("v0.96.0-dev"): sql = remove_target_cluster_from_explain(sql) return Testdrive(sql)
Subclasses
Class variables
var base_version : MzVersion
var current_version : MzVersion
var expects : int
var repeats : int
var suffix : str
var wait : bool
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( dedent( f""" $ postgres-execute connection=postgres://postgres:postgres@postgres CREATE USER postgres1{self.suffix} WITH SUPERUSER PASSWORD 'postgres'; ALTER USER postgres1{self.suffix} WITH replication; DROP PUBLICATION IF EXISTS postgres_source{self.suffix}; DROP TABLE IF EXISTS postgres_source_table{self.suffix}; CREATE TABLE postgres_source_table{self.suffix} (f1 TEXT, f2 INTEGER, f3 TEXT UNIQUE NOT NULL, PRIMARY KEY(f1, f2)); ALTER TABLE postgres_source_table{self.suffix} REPLICA IDENTITY FULL; INSERT INTO postgres_source_table{self.suffix} SELECT 'A', i, REPEAT('A', {self.repeats} - i) FROM generate_series(1,100) AS i; CREATE PUBLICATION postgres_source{self.suffix} FOR ALL TABLES; > CREATE SECRET pgpass1{self.suffix} AS 'postgres'; > CREATE CONNECTION pg1{self.suffix} FOR POSTGRES HOST 'postgres', DATABASE postgres, USER postgres1{self.suffix}, PASSWORD SECRET pgpass1{self.suffix} """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(s)) for s in [ f""" > CREATE SOURCE postgres_source1{self.suffix} FROM POSTGRES CONNECTION pg1{self.suffix} (PUBLICATION 'postgres_source{self.suffix}') FOR TABLES (postgres_source_table{self.suffix} AS postgres_source_tableA{self.suffix}); > CREATE DEFAULT INDEX ON postgres_source_tableA{self.suffix}; $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'B', i, REPEAT('B', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; > CREATE SECRET pgpass2{self.suffix} AS 'postgres'; > CREATE CONNECTION pg2{self.suffix} FOR POSTGRES HOST 'postgres', DATABASE postgres, USER postgres1{self.suffix}, PASSWORD SECRET pgpass1{self.suffix} $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'C', i, REPEAT('C', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; $[version>=5200] postgres-execute connection=postgres://mz_system@${{testdrive.materialize-internal-sql-addr}} GRANT USAGE ON CONNECTION pg2{self.suffix} TO materialize $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'D', i, REPEAT('D', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; > CREATE SOURCE postgres_source2{self.suffix} FROM POSTGRES CONNECTION pg2{self.suffix} (PUBLICATION 'postgres_source{self.suffix}') FOR TABLES (postgres_source_table{self.suffix} AS postgres_source_tableB{self.suffix}); # Create a view with a complex dependency structure > CREATE VIEW IF NOT EXISTS table_a_b_count_sum AS SELECT SUM(total_count) AS total_rows FROM ( SELECT COUNT(*) AS total_count FROM postgres_source_tableA{self.suffix} UNION ALL SELECT COUNT(*) AS total_count FROM postgres_source_tableB{self.suffix} ); """ + ( f""" # Wait until Pg snapshot is complete in order to avoid #18940 > SELECT COUNT(*) > 0 FROM postgres_source_tableA{self.suffix} true # Wait until Pg snapshot is complete in order to avoid #18940 > SELECT COUNT(*) > 0 FROM postgres_source_tableB{self.suffix} true """ if self.wait else "" ), f""" $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'E', i, REPEAT('E', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'F', i, REPEAT('F', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; > CREATE SECRET pgpass3{self.suffix} AS 'postgres'; > CREATE CONNECTION pg3{self.suffix} FOR POSTGRES HOST 'postgres', DATABASE postgres, USER postgres1{self.suffix}, PASSWORD SECRET pgpass3{self.suffix} > CREATE SOURCE postgres_source3{self.suffix} FROM POSTGRES CONNECTION pg3{self.suffix} (PUBLICATION 'postgres_source{self.suffix}') FOR TABLES (postgres_source_table{self.suffix} AS postgres_source_tableC{self.suffix}); $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'G', i, REPEAT('G', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_source_table{self.suffix} SELECT 'H', i, REPEAT('X', {self.repeats} - i) FROM generate_series(1,100) AS i; UPDATE postgres_source_table{self.suffix} SET f2 = f2 + 100; """ + ( f""" # Wait until Pg snapshot is complete in order to avoid #18940 > SELECT COUNT(*) > 0 FROM postgres_source_tableB{self.suffix} true > SELECT COUNT(*) > 0 FROM postgres_source_tableC{self.suffix} true """ if self.wait else "" ), ] ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: sql = dedent( f""" $ postgres-execute connection=postgres://mz_system@${{testdrive.materialize-internal-sql-addr}} GRANT SELECT ON postgres_source_tableA{self.suffix} TO materialize GRANT SELECT ON postgres_source_tableB{self.suffix} TO materialize GRANT SELECT ON postgres_source_tableC{self.suffix} TO materialize # Can take longer after a restart $ set-sql-timeout duration=600s > SELECT f1, max(f2), SUM(LENGTH(f3)) FROM postgres_source_tableA{self.suffix} GROUP BY f1; A 800 {self.expects} B 800 {self.expects} C 700 {self.expects} D 600 {self.expects} E 500 {self.expects} F 400 {self.expects} G 300 {self.expects} H 200 {self.expects} > SELECT f1, max(f2), SUM(LENGTH(f3)) FROM postgres_source_tableB{self.suffix} GROUP BY f1; A 800 {self.expects} B 800 {self.expects} C 700 {self.expects} D 600 {self.expects} E 500 {self.expects} F 400 {self.expects} G 300 {self.expects} H 200 {self.expects} > SELECT f1, max(f2), SUM(LENGTH(f3)) FROM postgres_source_tableC{self.suffix} GROUP BY f1; A 800 {self.expects} B 800 {self.expects} C 700 {self.expects} D 600 {self.expects} E 500 {self.expects} F 400 {self.expects} G 300 {self.expects} H 200 {self.expects} > SELECT total_rows FROM table_a_b_count_sum; 1600 """ ) if self.base_version >= MzVersion.parse_mz("v0.50.0-dev"): sql += dedent( f""" # Confirm that the primary key information has been propagated from Pg > SELECT key FROM (SHOW INDEXES ON postgres_source_tableA{self.suffix}); {{f1,f2}} ? EXPLAIN SELECT DISTINCT f1, f2 FROM postgres_source_tableA{self.suffix}; Explained Query (fast path): Project (#0, #1) ReadIndex on=materialize.public.postgres_source_tablea{self.suffix} postgres_source_tablea{self.suffix}_primary_idx=[*** full scan ***] Used Indexes: - materialize.public.postgres_source_tablea{self.suffix}_primary_idx (*** full scan ***) Target cluster: quickstart """ ) if self.current_version < MzVersion.parse_mz("v0.96.0-dev"): sql = remove_target_cluster_from_explain(sql) return Testdrive(sql)
class PgCdcMzNow (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
@externally_idempotent(False) class PgCdcMzNow(Check): def initialize(self) -> Testdrive: return Testdrive( dedent( """ $ postgres-execute connection=postgres://postgres:postgres@postgres CREATE USER postgres2 WITH SUPERUSER PASSWORD 'postgres'; ALTER USER postgres2 WITH replication; DROP PUBLICATION IF EXISTS postgres_mz_now_publication; DROP TABLE IF EXISTS postgres_mz_now_table; CREATE TABLE postgres_mz_now_table (f1 TIMESTAMP, f2 CHAR(5), PRIMARY KEY (f1, f2)); ALTER TABLE postgres_mz_now_table REPLICA IDENTITY FULL; INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A1'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B1'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C1'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D1'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E1'); CREATE PUBLICATION postgres_mz_now_publication FOR ALL TABLES; > CREATE SECRET postgres_mz_now_pass AS 'postgres'; > CREATE CONNECTION postgres_mz_now_conn FOR POSTGRES HOST 'postgres', DATABASE postgres, USER postgres2, PASSWORD SECRET postgres_mz_now_pass > CREATE SOURCE postgres_mz_now_source FROM POSTGRES CONNECTION postgres_mz_now_conn (PUBLICATION 'postgres_mz_now_publication') FOR TABLES (postgres_mz_now_table); # Return all rows fresher than 60 seconds > CREATE MATERIALIZED VIEW postgres_mz_now_view AS SELECT * FROM postgres_mz_now_table WHERE mz_now() <= ROUND(EXTRACT(epoch FROM f1 + INTERVAL '60' SECOND) * 1000) """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(s)) for s in [ """ $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A2'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B2'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C2'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D2'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E2'); DELETE FROM postgres_mz_now_table WHERE f2 = 'B1'; UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'C1'; """, """ $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A3'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C3'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D3'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E3'); DELETE FROM postgres_mz_now_table WHERE f2 = 'B2'; UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'D1'; """, ] ] def validate(self) -> Testdrive: return Testdrive( dedent( """ > SELECT COUNT(*) FROM postgres_mz_now_table; 13 $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A4'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B4'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C4'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D4'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E4'); DELETE FROM postgres_mz_now_table WHERE f2 = 'B3'; UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'E1' # Expect some rows newer than 60 seconds in view > SELECT COUNT(*) >= 6 FROM postgres_mz_now_view WHERE f1 > NOW() - INTERVAL '60' SECOND; true # Expect no rows older than 60 seconds in view > SELECT COUNT(*) FROM postgres_mz_now_view WHERE f1 < NOW() - INTERVAL '60' SECOND; 0 # Rollback the last INSERTs so that validate() can be called multiple times $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3'); DELETE FROM postgres_mz_now_table WHERE f2 LIKE '%4%'; """ ) )
Ancestors
Class variables
var externally_idempotent : bool
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( dedent( """ $ postgres-execute connection=postgres://postgres:postgres@postgres CREATE USER postgres2 WITH SUPERUSER PASSWORD 'postgres'; ALTER USER postgres2 WITH replication; DROP PUBLICATION IF EXISTS postgres_mz_now_publication; DROP TABLE IF EXISTS postgres_mz_now_table; CREATE TABLE postgres_mz_now_table (f1 TIMESTAMP, f2 CHAR(5), PRIMARY KEY (f1, f2)); ALTER TABLE postgres_mz_now_table REPLICA IDENTITY FULL; INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A1'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B1'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C1'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D1'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E1'); CREATE PUBLICATION postgres_mz_now_publication FOR ALL TABLES; > CREATE SECRET postgres_mz_now_pass AS 'postgres'; > CREATE CONNECTION postgres_mz_now_conn FOR POSTGRES HOST 'postgres', DATABASE postgres, USER postgres2, PASSWORD SECRET postgres_mz_now_pass > CREATE SOURCE postgres_mz_now_source FROM POSTGRES CONNECTION postgres_mz_now_conn (PUBLICATION 'postgres_mz_now_publication') FOR TABLES (postgres_mz_now_table); # Return all rows fresher than 60 seconds > CREATE MATERIALIZED VIEW postgres_mz_now_view AS SELECT * FROM postgres_mz_now_table WHERE mz_now() <= ROUND(EXTRACT(epoch FROM f1 + INTERVAL '60' SECOND) * 1000) """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(s)) for s in [ """ $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A2'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B2'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C2'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D2'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E2'); DELETE FROM postgres_mz_now_table WHERE f2 = 'B1'; UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'C1'; """, """ $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A3'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C3'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D3'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E3'); DELETE FROM postgres_mz_now_table WHERE f2 = 'B2'; UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'D1'; """, ] ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: return Testdrive( dedent( """ > SELECT COUNT(*) FROM postgres_mz_now_table; 13 $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_mz_now_table VALUES (NOW(), 'A4'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B4'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'C4'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'D4'); INSERT INTO postgres_mz_now_table VALUES (NOW(), 'E4'); DELETE FROM postgres_mz_now_table WHERE f2 = 'B3'; UPDATE postgres_mz_now_table SET f1 = NOW() WHERE f2 = 'E1' # Expect some rows newer than 60 seconds in view > SELECT COUNT(*) >= 6 FROM postgres_mz_now_view WHERE f1 > NOW() - INTERVAL '60' SECOND; true # Expect no rows older than 60 seconds in view > SELECT COUNT(*) FROM postgres_mz_now_view WHERE f1 < NOW() - INTERVAL '60' SECOND; 0 # Rollback the last INSERTs so that validate() can be called multiple times $ postgres-execute connection=postgres://postgres:postgres@postgres INSERT INTO postgres_mz_now_table VALUES (NOW(), 'B3'); DELETE FROM postgres_mz_now_table WHERE f2 LIKE '%4%'; """ ) )
class PgCdcNoWait (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
@externally_idempotent(False) class PgCdcNoWait(PgCdcBase, Check): def __init__(self, base_version: MzVersion, rng: Random | None) -> None: super().__init__(wait=False, base_version=base_version, rng=rng)
Ancestors
Class variables
var externally_idempotent : bool