Module materialize.checks.all_checks.retain_history
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import re
from textwrap import dedent
from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, disabled
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion
# This duration needs to be long enough for running all scenarios and the CI build!
RETAIN_HISTORY_DURATION = "60m"
def schemas() -> str:
return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
@disabled(
"#24479 and compaction not predicable and now() not appropriate while mz_now() not applicable"
)
class RetainHistoryOnMv(Check):
def _can_run(self, e: Executor) -> bool:
return e.current_mz_version >= MzVersion.parse_mz("v0.81.0")
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
f"""
> CREATE TABLE time_for_mv (time_index INT, t TIMESTAMP);
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_mv VALUES (0, now());
> CREATE TABLE retain_history_table (key INT, value INT);
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_mv VALUES (1, now());
> INSERT INTO retain_history_table VALUES (1, 100), (2, 200), (3, 300);
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_mv VALUES (2, now());
> CREATE MATERIALIZED VIEW retain_history_mv1 WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') AS
SELECT * FROM retain_history_table;
> SELECT count(*) FROM retain_history_mv1;
3
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_mv VALUES (3, now());
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(s))
for s in [
f"""
> UPDATE retain_history_table SET value = value + 10 WHERE key = 1;
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_mv VALUES (4, now());
> INSERT INTO retain_history_table VALUES (4, 400);
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_mv VALUES (5, now());
> DELETE FROM retain_history_table WHERE key = 3;
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_mv VALUES (6, now());
> CREATE MATERIALIZED VIEW retain_history_mv2 WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') AS
SELECT * FROM retain_history_table;
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_mv VALUES (7, now());
""",
f"""
> CREATE MATERIALIZED VIEW retain_history_mv3 WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') AS
SELECT * FROM retain_history_mv2;
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_mv VALUES (8, now());
> UPDATE retain_history_table SET value = value + 1;
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_mv VALUES (9, now());
> INSERT INTO retain_history_table VALUES (5, 500);
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_mv VALUES (10, now());
> DELETE FROM retain_history_table WHERE key = 4;
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_mv VALUES (11, now());
""",
]
]
def validate(self) -> Testdrive:
time_definitions = """
$ set-from-sql var=time0
SELECT t::STRING FROM time_for_mv WHERE time_index = 0
$ set-from-sql var=time1
SELECT t::STRING FROM time_for_mv WHERE time_index = 1
$ set-from-sql var=time2
SELECT t::STRING FROM time_for_mv WHERE time_index = 2
$ set-from-sql var=time3
SELECT t::STRING FROM time_for_mv WHERE time_index = 3
$ set-from-sql var=time4
SELECT t::STRING FROM time_for_mv WHERE time_index = 4
$ set-from-sql var=time5
SELECT t::STRING FROM time_for_mv WHERE time_index = 5
$ set-from-sql var=time6
SELECT t::STRING FROM time_for_mv WHERE time_index = 6
$ set-from-sql var=time7
SELECT t::STRING FROM time_for_mv WHERE time_index = 7
$ set-from-sql var=time8
SELECT t::STRING FROM time_for_mv WHERE time_index = 8
$ set-from-sql var=time9
SELECT t::STRING FROM time_for_mv WHERE time_index = 9
$ set-from-sql var=time10
SELECT t::STRING FROM time_for_mv WHERE time_index = 10
$ set-from-sql var=time11
SELECT t::STRING FROM time_for_mv WHERE time_index = 11
"""
content_validations = "\n".join(
f"""
! SELECT * FROM {mv_name} AS OF '${{time0}}'::TIMESTAMP; -- time0 (nothing exists)
contains: is not valid for all inputs
! SELECT count(*) FROM {mv_name} AS OF '${{time1}}'::TIMESTAMP; -- time1 (table created)
contains: is not valid for all inputs
> SELECT * FROM {mv_name} AS OF '${{time2}}'::TIMESTAMP; -- time2 (table populated)
1 100
2 200
3 300
> SELECT * FROM {mv_name} AS OF '${{time3}}'::TIMESTAMP; -- time3 (mv1 created)
1 100
2 200
3 300
> SELECT * FROM {mv_name} AS OF '${{time4}}'::TIMESTAMP; -- time4 (table updated in manipulate#1)
1 110
2 200
3 300
> SELECT * FROM {mv_name} AS OF '${{time5}}'::TIMESTAMP; -- time5 (table updated in manipulate#1)
1 110
2 200
3 300
4 400
> SELECT * FROM {mv_name} AS OF '${{time6}}'::TIMESTAMP; -- time6 (table updated in manipulate#1)
1 110
2 200
4 400
> SELECT * FROM {mv_name} AS OF '${{time7}}'::TIMESTAMP; -- time7 (mv2 created in manipulate#1)
1 110
2 200
4 400
> SELECT * FROM {mv_name} AS OF '${{time8}}'::TIMESTAMP; -- time8 (mv3 created in manipulate#2)
1 110
2 200
4 400
> SELECT * FROM {mv_name} AS OF '${{time9}}'::TIMESTAMP; -- time9 (table updated in manipulate#2)
1 111
2 201
4 401
> SELECT * FROM {mv_name} AS OF '${{time10}}'::TIMESTAMP; -- time10 (table updated in manipulate#2)
1 111
2 201
4 401
5 500
> SELECT * FROM {mv_name} AS OF '${{time11}}'::TIMESTAMP; -- time11 (table updated in manipulate#2)
1 111
2 201
5 500
> SELECT * FROM {mv_name};
1 111
2 201
5 500
"""
for mv_name in [
"retain_history_mv1",
"retain_history_mv2",
"retain_history_mv3",
]
)
definition_validations = f"""
> SELECT create_sql FROM (SHOW CREATE MATERIALIZED VIEW retain_history_mv1);
"CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"retain_history_mv1\\" IN CLUSTER \\"quickstart\\" WITH (RETAIN HISTORY = FOR '{RETAIN_HISTORY_DURATION}', REFRESH = ON COMMIT) AS SELECT * FROM \\"materialize\\".\\"public\\".\\"retain_history_table\\""
> SELECT create_sql FROM (SHOW CREATE MATERIALIZED VIEW retain_history_mv2);
"CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"retain_history_mv2\\" IN CLUSTER \\"quickstart\\" WITH (RETAIN HISTORY = FOR '{RETAIN_HISTORY_DURATION}', REFRESH = ON COMMIT) AS SELECT * FROM \\"materialize\\".\\"public\\".\\"retain_history_table\\""
> SELECT create_sql FROM (SHOW CREATE MATERIALIZED VIEW retain_history_mv3);
"CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"retain_history_mv3\\" IN CLUSTER \\"quickstart\\" WITH (RETAIN HISTORY = FOR '{RETAIN_HISTORY_DURATION}', REFRESH = ON COMMIT) AS SELECT * FROM \\"materialize\\".\\"public\\".\\"retain_history_mv2\\""
"""
other_validations = """
? EXPLAIN OPTIMIZED PLAN AS TEXT FOR SELECT * FROM retain_history_mv1
Explained Query:
ReadStorage materialize.public.retain_history_mv1
Target cluster: quickstart
"""
if self.base_version < MzVersion.parse_mz("v0.96.0-dev"):
other_validations = remove_target_cluster_from_explain(other_validations)
return Testdrive(
dedent(
f"""
{time_definitions}
{content_validations}
{definition_validations}
{other_validations}
"""
)
)
@disabled(
"#24479 and compaction not predicable and now() not appropriate while mz_now() not applicable"
)
class RetainHistoryOnKafkaSource(Check):
def _can_run(self, e: Executor) -> bool:
return e.current_mz_version >= MzVersion.parse_mz("v0.81.0")
def initialize(self) -> Testdrive:
return Testdrive(
schemas()
+ dedent(
f"""
> CREATE TABLE time_for_source (time_index INT, t TIMESTAMP);
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_source VALUES (0, now());
$ kafka-create-topic topic=retain-history
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_source VALUES (1, now());
$ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${{keyschema}} schema=${{schema}} repeat=4
{{"key1": "K${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}}
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_source VALUES (2, now());
$ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${{keyschema}} schema=${{schema}} repeat=5
{{"key1": "K${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}}
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_source VALUES (3, now());
> CREATE SOURCE retain_history_source
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-retain-history-${{testdrive.seed}}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT
WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}')
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_source VALUES (4, now());
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
for s in [
"""
$ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${keyschema} schema=${schema} repeat=2
{"key1": "K${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"}
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_source VALUES (5, now());
""",
"""
$ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${keyschema} schema=${schema} repeat=6
{"key1": "K${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"}
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_source VALUES (6, now());
$ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${keyschema} schema=${schema} repeat=1
{"key1": "K${kafka-ingest.iteration}"} {"f1": "D${kafka-ingest.iteration}"}
# Give it some time
$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s"
> INSERT INTO time_for_source VALUES (7, now());
""",
]
]
def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
$ set-from-sql var=time0
SELECT t::STRING FROM time_for_source WHERE time_index = 0
$ set-from-sql var=time1
SELECT t::STRING FROM time_for_source WHERE time_index = 1
$ set-from-sql var=time2
SELECT t::STRING FROM time_for_source WHERE time_index = 2
$ set-from-sql var=time3
SELECT t::STRING FROM time_for_source WHERE time_index = 3
$ set-from-sql var=time4
SELECT t::STRING FROM time_for_source WHERE time_index = 4
$ set-from-sql var=time5
SELECT t::STRING FROM time_for_source WHERE time_index = 5
$ set-from-sql var=time6
SELECT t::STRING FROM time_for_source WHERE time_index = 6
$ set-from-sql var=time7
SELECT t::STRING FROM time_for_source WHERE time_index = 7
> SELECT * FROM retain_history_source AS OF '${time0}'::TIMESTAMP; -- time0 (nothing exists)
K0 A0
K1 A1
K2 A2
K3 A3
K4 A4
> SELECT * FROM retain_history_source AS OF '${time1}'::TIMESTAMP; -- time1 (topic created)
K0 A0
K1 A1
K2 A2
K3 A3
K4 A4
> SELECT * FROM retain_history_source AS OF '${time2}'::TIMESTAMP; -- time2 (added data to topic)
K0 A0
K1 A1
K2 A2
K3 A3
K4 A4
> SELECT * FROM retain_history_source AS OF '${time3}'::TIMESTAMP; -- time3 (further added data to topic)
K0 A0
K1 A1
K2 A2
K3 A3
K4 A4
> SELECT * FROM retain_history_source AS OF '${time4}'::TIMESTAMP; -- time4 (created source)
K0 A0
K1 A1
K2 A2
K3 A3
K4 A4
> SELECT * FROM retain_history_source AS OF '${time5}'::TIMESTAMP; -- time5 (updated data in topic in manipulate#1)
K0 B0
K1 B1
K2 A2
K3 A3
K4 A4
> SELECT * FROM retain_history_source AS OF '${time6}'::TIMESTAMP; -- time6 (updated data in topic in manipulate#2)
K0 C0
K1 C1
K2 C2
K3 C3
K4 C4
K5 C5
> SELECT * FROM retain_history_source AS OF '${time7}'::TIMESTAMP; -- time7 (updated data in topic again in manipulate#2)
K0 D0
K1 C1
K2 C2
K3 C3
K4 C4
K5 C5
> SELECT * FROM retain_history_source;
K0 D0
K1 C1
K2 C2
K3 C3
K4 C4
K5 C5
"""
)
)
def remove_target_cluster_from_explain(sql: str) -> str:
return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
Functions
def remove_target_cluster_from_explain(sql: str) ‑> str
-
Expand source code Browse git
def remove_target_cluster_from_explain(sql: str) -> str: return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
def schemas() ‑> str
-
Expand source code Browse git
def schemas() -> str: return dedent(KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD)
Classes
class RetainHistoryOnKafkaSource (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
@disabled( "#24479 and compaction not predicable and now() not appropriate while mz_now() not applicable" ) class RetainHistoryOnKafkaSource(Check): def _can_run(self, e: Executor) -> bool: return e.current_mz_version >= MzVersion.parse_mz("v0.81.0") def initialize(self) -> Testdrive: return Testdrive( schemas() + dedent( f""" > CREATE TABLE time_for_source (time_index INT, t TIMESTAMP); # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (0, now()); $ kafka-create-topic topic=retain-history # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (1, now()); $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${{keyschema}} schema=${{schema}} repeat=4 {{"key1": "K${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}} # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (2, now()); $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${{keyschema}} schema=${{schema}} repeat=5 {{"key1": "K${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}} # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (3, now()); > CREATE SOURCE retain_history_source FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-retain-history-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (4, now()); """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(schemas() + dedent(s)) for s in [ """ $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${keyschema} schema=${schema} repeat=2 {"key1": "K${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"} # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (5, now()); """, """ $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${keyschema} schema=${schema} repeat=6 {"key1": "K${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"} # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (6, now()); $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${keyschema} schema=${schema} repeat=1 {"key1": "K${kafka-ingest.iteration}"} {"f1": "D${kafka-ingest.iteration}"} # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (7, now()); """, ] ] def validate(self) -> Testdrive: return Testdrive( dedent( """ $ set-from-sql var=time0 SELECT t::STRING FROM time_for_source WHERE time_index = 0 $ set-from-sql var=time1 SELECT t::STRING FROM time_for_source WHERE time_index = 1 $ set-from-sql var=time2 SELECT t::STRING FROM time_for_source WHERE time_index = 2 $ set-from-sql var=time3 SELECT t::STRING FROM time_for_source WHERE time_index = 3 $ set-from-sql var=time4 SELECT t::STRING FROM time_for_source WHERE time_index = 4 $ set-from-sql var=time5 SELECT t::STRING FROM time_for_source WHERE time_index = 5 $ set-from-sql var=time6 SELECT t::STRING FROM time_for_source WHERE time_index = 6 $ set-from-sql var=time7 SELECT t::STRING FROM time_for_source WHERE time_index = 7 > SELECT * FROM retain_history_source AS OF '${time0}'::TIMESTAMP; -- time0 (nothing exists) K0 A0 K1 A1 K2 A2 K3 A3 K4 A4 > SELECT * FROM retain_history_source AS OF '${time1}'::TIMESTAMP; -- time1 (topic created) K0 A0 K1 A1 K2 A2 K3 A3 K4 A4 > SELECT * FROM retain_history_source AS OF '${time2}'::TIMESTAMP; -- time2 (added data to topic) K0 A0 K1 A1 K2 A2 K3 A3 K4 A4 > SELECT * FROM retain_history_source AS OF '${time3}'::TIMESTAMP; -- time3 (further added data to topic) K0 A0 K1 A1 K2 A2 K3 A3 K4 A4 > SELECT * FROM retain_history_source AS OF '${time4}'::TIMESTAMP; -- time4 (created source) K0 A0 K1 A1 K2 A2 K3 A3 K4 A4 > SELECT * FROM retain_history_source AS OF '${time5}'::TIMESTAMP; -- time5 (updated data in topic in manipulate#1) K0 B0 K1 B1 K2 A2 K3 A3 K4 A4 > SELECT * FROM retain_history_source AS OF '${time6}'::TIMESTAMP; -- time6 (updated data in topic in manipulate#2) K0 C0 K1 C1 K2 C2 K3 C3 K4 C4 K5 C5 > SELECT * FROM retain_history_source AS OF '${time7}'::TIMESTAMP; -- time7 (updated data in topic again in manipulate#2) K0 D0 K1 C1 K2 C2 K3 C3 K4 C4 K5 C5 > SELECT * FROM retain_history_source; K0 D0 K1 C1 K2 C2 K3 C3 K4 C4 K5 C5 """ ) )
Ancestors
Class variables
var enabled : bool
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( schemas() + dedent( f""" > CREATE TABLE time_for_source (time_index INT, t TIMESTAMP); # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (0, now()); $ kafka-create-topic topic=retain-history # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (1, now()); $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${{keyschema}} schema=${{schema}} repeat=4 {{"key1": "K${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}} # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (2, now()); $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${{keyschema}} schema=${{schema}} repeat=5 {{"key1": "K${{kafka-ingest.iteration}}"}} {{"f1": "A${{kafka-ingest.iteration}}"}} # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (3, now()); > CREATE SOURCE retain_history_source FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-retain-history-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (4, now()); """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive(schemas() + dedent(s)) for s in [ """ $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${keyschema} schema=${schema} repeat=2 {"key1": "K${kafka-ingest.iteration}"} {"f1": "B${kafka-ingest.iteration}"} # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (5, now()); """, """ $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${keyschema} schema=${schema} repeat=6 {"key1": "K${kafka-ingest.iteration}"} {"f1": "C${kafka-ingest.iteration}"} # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (6, now()); $ kafka-ingest format=avro key-format=avro topic=retain-history key-schema=${keyschema} schema=${schema} repeat=1 {"key1": "K${kafka-ingest.iteration}"} {"f1": "D${kafka-ingest.iteration}"} # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_source VALUES (7, now()); """, ] ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: return Testdrive( dedent( """ $ set-from-sql var=time0 SELECT t::STRING FROM time_for_source WHERE time_index = 0 $ set-from-sql var=time1 SELECT t::STRING FROM time_for_source WHERE time_index = 1 $ set-from-sql var=time2 SELECT t::STRING FROM time_for_source WHERE time_index = 2 $ set-from-sql var=time3 SELECT t::STRING FROM time_for_source WHERE time_index = 3 $ set-from-sql var=time4 SELECT t::STRING FROM time_for_source WHERE time_index = 4 $ set-from-sql var=time5 SELECT t::STRING FROM time_for_source WHERE time_index = 5 $ set-from-sql var=time6 SELECT t::STRING FROM time_for_source WHERE time_index = 6 $ set-from-sql var=time7 SELECT t::STRING FROM time_for_source WHERE time_index = 7 > SELECT * FROM retain_history_source AS OF '${time0}'::TIMESTAMP; -- time0 (nothing exists) K0 A0 K1 A1 K2 A2 K3 A3 K4 A4 > SELECT * FROM retain_history_source AS OF '${time1}'::TIMESTAMP; -- time1 (topic created) K0 A0 K1 A1 K2 A2 K3 A3 K4 A4 > SELECT * FROM retain_history_source AS OF '${time2}'::TIMESTAMP; -- time2 (added data to topic) K0 A0 K1 A1 K2 A2 K3 A3 K4 A4 > SELECT * FROM retain_history_source AS OF '${time3}'::TIMESTAMP; -- time3 (further added data to topic) K0 A0 K1 A1 K2 A2 K3 A3 K4 A4 > SELECT * FROM retain_history_source AS OF '${time4}'::TIMESTAMP; -- time4 (created source) K0 A0 K1 A1 K2 A2 K3 A3 K4 A4 > SELECT * FROM retain_history_source AS OF '${time5}'::TIMESTAMP; -- time5 (updated data in topic in manipulate#1) K0 B0 K1 B1 K2 A2 K3 A3 K4 A4 > SELECT * FROM retain_history_source AS OF '${time6}'::TIMESTAMP; -- time6 (updated data in topic in manipulate#2) K0 C0 K1 C1 K2 C2 K3 C3 K4 C4 K5 C5 > SELECT * FROM retain_history_source AS OF '${time7}'::TIMESTAMP; -- time7 (updated data in topic again in manipulate#2) K0 D0 K1 C1 K2 C2 K3 C3 K4 C4 K5 C5 > SELECT * FROM retain_history_source; K0 D0 K1 C1 K2 C2 K3 C3 K4 C4 K5 C5 """ ) )
class RetainHistoryOnMv (base_version: MzVersion, rng: random.Random | None)
-
Expand source code Browse git
@disabled( "#24479 and compaction not predicable and now() not appropriate while mz_now() not applicable" ) class RetainHistoryOnMv(Check): def _can_run(self, e: Executor) -> bool: return e.current_mz_version >= MzVersion.parse_mz("v0.81.0") def initialize(self) -> Testdrive: return Testdrive( dedent( f""" > CREATE TABLE time_for_mv (time_index INT, t TIMESTAMP); # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (0, now()); > CREATE TABLE retain_history_table (key INT, value INT); # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (1, now()); > INSERT INTO retain_history_table VALUES (1, 100), (2, 200), (3, 300); # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (2, now()); > CREATE MATERIALIZED VIEW retain_history_mv1 WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') AS SELECT * FROM retain_history_table; > SELECT count(*) FROM retain_history_mv1; 3 # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (3, now()); """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(s)) for s in [ f""" > UPDATE retain_history_table SET value = value + 10 WHERE key = 1; # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (4, now()); > INSERT INTO retain_history_table VALUES (4, 400); # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (5, now()); > DELETE FROM retain_history_table WHERE key = 3; # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (6, now()); > CREATE MATERIALIZED VIEW retain_history_mv2 WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') AS SELECT * FROM retain_history_table; # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (7, now()); """, f""" > CREATE MATERIALIZED VIEW retain_history_mv3 WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') AS SELECT * FROM retain_history_mv2; # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (8, now()); > UPDATE retain_history_table SET value = value + 1; # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (9, now()); > INSERT INTO retain_history_table VALUES (5, 500); # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (10, now()); > DELETE FROM retain_history_table WHERE key = 4; # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (11, now()); """, ] ] def validate(self) -> Testdrive: time_definitions = """ $ set-from-sql var=time0 SELECT t::STRING FROM time_for_mv WHERE time_index = 0 $ set-from-sql var=time1 SELECT t::STRING FROM time_for_mv WHERE time_index = 1 $ set-from-sql var=time2 SELECT t::STRING FROM time_for_mv WHERE time_index = 2 $ set-from-sql var=time3 SELECT t::STRING FROM time_for_mv WHERE time_index = 3 $ set-from-sql var=time4 SELECT t::STRING FROM time_for_mv WHERE time_index = 4 $ set-from-sql var=time5 SELECT t::STRING FROM time_for_mv WHERE time_index = 5 $ set-from-sql var=time6 SELECT t::STRING FROM time_for_mv WHERE time_index = 6 $ set-from-sql var=time7 SELECT t::STRING FROM time_for_mv WHERE time_index = 7 $ set-from-sql var=time8 SELECT t::STRING FROM time_for_mv WHERE time_index = 8 $ set-from-sql var=time9 SELECT t::STRING FROM time_for_mv WHERE time_index = 9 $ set-from-sql var=time10 SELECT t::STRING FROM time_for_mv WHERE time_index = 10 $ set-from-sql var=time11 SELECT t::STRING FROM time_for_mv WHERE time_index = 11 """ content_validations = "\n".join( f""" ! SELECT * FROM {mv_name} AS OF '${{time0}}'::TIMESTAMP; -- time0 (nothing exists) contains: is not valid for all inputs ! SELECT count(*) FROM {mv_name} AS OF '${{time1}}'::TIMESTAMP; -- time1 (table created) contains: is not valid for all inputs > SELECT * FROM {mv_name} AS OF '${{time2}}'::TIMESTAMP; -- time2 (table populated) 1 100 2 200 3 300 > SELECT * FROM {mv_name} AS OF '${{time3}}'::TIMESTAMP; -- time3 (mv1 created) 1 100 2 200 3 300 > SELECT * FROM {mv_name} AS OF '${{time4}}'::TIMESTAMP; -- time4 (table updated in manipulate#1) 1 110 2 200 3 300 > SELECT * FROM {mv_name} AS OF '${{time5}}'::TIMESTAMP; -- time5 (table updated in manipulate#1) 1 110 2 200 3 300 4 400 > SELECT * FROM {mv_name} AS OF '${{time6}}'::TIMESTAMP; -- time6 (table updated in manipulate#1) 1 110 2 200 4 400 > SELECT * FROM {mv_name} AS OF '${{time7}}'::TIMESTAMP; -- time7 (mv2 created in manipulate#1) 1 110 2 200 4 400 > SELECT * FROM {mv_name} AS OF '${{time8}}'::TIMESTAMP; -- time8 (mv3 created in manipulate#2) 1 110 2 200 4 400 > SELECT * FROM {mv_name} AS OF '${{time9}}'::TIMESTAMP; -- time9 (table updated in manipulate#2) 1 111 2 201 4 401 > SELECT * FROM {mv_name} AS OF '${{time10}}'::TIMESTAMP; -- time10 (table updated in manipulate#2) 1 111 2 201 4 401 5 500 > SELECT * FROM {mv_name} AS OF '${{time11}}'::TIMESTAMP; -- time11 (table updated in manipulate#2) 1 111 2 201 5 500 > SELECT * FROM {mv_name}; 1 111 2 201 5 500 """ for mv_name in [ "retain_history_mv1", "retain_history_mv2", "retain_history_mv3", ] ) definition_validations = f""" > SELECT create_sql FROM (SHOW CREATE MATERIALIZED VIEW retain_history_mv1); "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"retain_history_mv1\\" IN CLUSTER \\"quickstart\\" WITH (RETAIN HISTORY = FOR '{RETAIN_HISTORY_DURATION}', REFRESH = ON COMMIT) AS SELECT * FROM \\"materialize\\".\\"public\\".\\"retain_history_table\\"" > SELECT create_sql FROM (SHOW CREATE MATERIALIZED VIEW retain_history_mv2); "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"retain_history_mv2\\" IN CLUSTER \\"quickstart\\" WITH (RETAIN HISTORY = FOR '{RETAIN_HISTORY_DURATION}', REFRESH = ON COMMIT) AS SELECT * FROM \\"materialize\\".\\"public\\".\\"retain_history_table\\"" > SELECT create_sql FROM (SHOW CREATE MATERIALIZED VIEW retain_history_mv3); "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"retain_history_mv3\\" IN CLUSTER \\"quickstart\\" WITH (RETAIN HISTORY = FOR '{RETAIN_HISTORY_DURATION}', REFRESH = ON COMMIT) AS SELECT * FROM \\"materialize\\".\\"public\\".\\"retain_history_mv2\\"" """ other_validations = """ ? EXPLAIN OPTIMIZED PLAN AS TEXT FOR SELECT * FROM retain_history_mv1 Explained Query: ReadStorage materialize.public.retain_history_mv1 Target cluster: quickstart """ if self.base_version < MzVersion.parse_mz("v0.96.0-dev"): other_validations = remove_target_cluster_from_explain(other_validations) return Testdrive( dedent( f""" {time_definitions} {content_validations} {definition_validations} {other_validations} """ ) )
Ancestors
Class variables
var enabled : bool
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( dedent( f""" > CREATE TABLE time_for_mv (time_index INT, t TIMESTAMP); # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (0, now()); > CREATE TABLE retain_history_table (key INT, value INT); # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (1, now()); > INSERT INTO retain_history_table VALUES (1, 100), (2, 200), (3, 300); # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (2, now()); > CREATE MATERIALIZED VIEW retain_history_mv1 WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') AS SELECT * FROM retain_history_table; > SELECT count(*) FROM retain_history_mv1; 3 # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (3, now()); """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(s)) for s in [ f""" > UPDATE retain_history_table SET value = value + 10 WHERE key = 1; # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (4, now()); > INSERT INTO retain_history_table VALUES (4, 400); # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (5, now()); > DELETE FROM retain_history_table WHERE key = 3; # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (6, now()); > CREATE MATERIALIZED VIEW retain_history_mv2 WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') AS SELECT * FROM retain_history_table; # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (7, now()); """, f""" > CREATE MATERIALIZED VIEW retain_history_mv3 WITH (RETAIN HISTORY FOR '{RETAIN_HISTORY_DURATION}') AS SELECT * FROM retain_history_mv2; # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (8, now()); > UPDATE retain_history_table SET value = value + 1; # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (9, now()); > INSERT INTO retain_history_table VALUES (5, 500); # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (10, now()); > DELETE FROM retain_history_table WHERE key = 4; # Give it some time $ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration="1s" > INSERT INTO time_for_mv VALUES (11, now()); """, ] ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: time_definitions = """ $ set-from-sql var=time0 SELECT t::STRING FROM time_for_mv WHERE time_index = 0 $ set-from-sql var=time1 SELECT t::STRING FROM time_for_mv WHERE time_index = 1 $ set-from-sql var=time2 SELECT t::STRING FROM time_for_mv WHERE time_index = 2 $ set-from-sql var=time3 SELECT t::STRING FROM time_for_mv WHERE time_index = 3 $ set-from-sql var=time4 SELECT t::STRING FROM time_for_mv WHERE time_index = 4 $ set-from-sql var=time5 SELECT t::STRING FROM time_for_mv WHERE time_index = 5 $ set-from-sql var=time6 SELECT t::STRING FROM time_for_mv WHERE time_index = 6 $ set-from-sql var=time7 SELECT t::STRING FROM time_for_mv WHERE time_index = 7 $ set-from-sql var=time8 SELECT t::STRING FROM time_for_mv WHERE time_index = 8 $ set-from-sql var=time9 SELECT t::STRING FROM time_for_mv WHERE time_index = 9 $ set-from-sql var=time10 SELECT t::STRING FROM time_for_mv WHERE time_index = 10 $ set-from-sql var=time11 SELECT t::STRING FROM time_for_mv WHERE time_index = 11 """ content_validations = "\n".join( f""" ! SELECT * FROM {mv_name} AS OF '${{time0}}'::TIMESTAMP; -- time0 (nothing exists) contains: is not valid for all inputs ! SELECT count(*) FROM {mv_name} AS OF '${{time1}}'::TIMESTAMP; -- time1 (table created) contains: is not valid for all inputs > SELECT * FROM {mv_name} AS OF '${{time2}}'::TIMESTAMP; -- time2 (table populated) 1 100 2 200 3 300 > SELECT * FROM {mv_name} AS OF '${{time3}}'::TIMESTAMP; -- time3 (mv1 created) 1 100 2 200 3 300 > SELECT * FROM {mv_name} AS OF '${{time4}}'::TIMESTAMP; -- time4 (table updated in manipulate#1) 1 110 2 200 3 300 > SELECT * FROM {mv_name} AS OF '${{time5}}'::TIMESTAMP; -- time5 (table updated in manipulate#1) 1 110 2 200 3 300 4 400 > SELECT * FROM {mv_name} AS OF '${{time6}}'::TIMESTAMP; -- time6 (table updated in manipulate#1) 1 110 2 200 4 400 > SELECT * FROM {mv_name} AS OF '${{time7}}'::TIMESTAMP; -- time7 (mv2 created in manipulate#1) 1 110 2 200 4 400 > SELECT * FROM {mv_name} AS OF '${{time8}}'::TIMESTAMP; -- time8 (mv3 created in manipulate#2) 1 110 2 200 4 400 > SELECT * FROM {mv_name} AS OF '${{time9}}'::TIMESTAMP; -- time9 (table updated in manipulate#2) 1 111 2 201 4 401 > SELECT * FROM {mv_name} AS OF '${{time10}}'::TIMESTAMP; -- time10 (table updated in manipulate#2) 1 111 2 201 4 401 5 500 > SELECT * FROM {mv_name} AS OF '${{time11}}'::TIMESTAMP; -- time11 (table updated in manipulate#2) 1 111 2 201 5 500 > SELECT * FROM {mv_name}; 1 111 2 201 5 500 """ for mv_name in [ "retain_history_mv1", "retain_history_mv2", "retain_history_mv3", ] ) definition_validations = f""" > SELECT create_sql FROM (SHOW CREATE MATERIALIZED VIEW retain_history_mv1); "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"retain_history_mv1\\" IN CLUSTER \\"quickstart\\" WITH (RETAIN HISTORY = FOR '{RETAIN_HISTORY_DURATION}', REFRESH = ON COMMIT) AS SELECT * FROM \\"materialize\\".\\"public\\".\\"retain_history_table\\"" > SELECT create_sql FROM (SHOW CREATE MATERIALIZED VIEW retain_history_mv2); "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"retain_history_mv2\\" IN CLUSTER \\"quickstart\\" WITH (RETAIN HISTORY = FOR '{RETAIN_HISTORY_DURATION}', REFRESH = ON COMMIT) AS SELECT * FROM \\"materialize\\".\\"public\\".\\"retain_history_table\\"" > SELECT create_sql FROM (SHOW CREATE MATERIALIZED VIEW retain_history_mv3); "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"retain_history_mv3\\" IN CLUSTER \\"quickstart\\" WITH (RETAIN HISTORY = FOR '{RETAIN_HISTORY_DURATION}', REFRESH = ON COMMIT) AS SELECT * FROM \\"materialize\\".\\"public\\".\\"retain_history_mv2\\"" """ other_validations = """ ? EXPLAIN OPTIMIZED PLAN AS TEXT FOR SELECT * FROM retain_history_mv1 Explained Query: ReadStorage materialize.public.retain_history_mv1 Target cluster: quickstart """ if self.base_version < MzVersion.parse_mz("v0.96.0-dev"): other_validations = remove_target_cluster_from_explain(other_validations) return Testdrive( dedent( f""" {time_definitions} {content_validations} {definition_validations} {other_validations} """ ) )