misc.python.materialize.source_table_migration

Utilities for testing the source table migration

 1# Copyright Materialize, Inc. and contributors. All rights reserved.
 2#
 3# Use of this software is governed by the Business Source License
 4# included in the LICENSE file at the root of this repository.
 5#
 6# As of the Change Date specified in that file, in accordance with
 7# the Business Source License, use of this software will be governed
 8# by the Apache License, Version 2.0.
 9
10"""Utilities for testing the source table migration"""
11from materialize.mz_version import MzVersion
12from materialize.mzcompose.composition import Composition
13from materialize.version_list import get_published_minor_mz_versions
14
15
16def verify_sources_after_source_table_migration(
17    c: Composition, file: str, fail: bool = False
18) -> None:
19    source_names_rows = c.sql_query(
20        "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';"
21    )
22    source_names = [row[0] for row in source_names_rows]
23
24    print(f"Sources created in {file} are: {source_names}")
25
26    c.sql("SET statement_timeout = '20s'")
27
28    for source_name in source_names:
29        _verify_source(c, file, source_name, fail=fail)
30
31
32def _verify_source(
33    c: Composition, file: str, source_name: str, fail: bool = False
34) -> None:
35    try:
36        print(f"Checking source: {source_name}")
37
38        # must not crash
39        statement = f"SELECT count(*) FROM {source_name};"
40        c.sql_query(statement)
41
42        statement = f"SHOW CREATE SOURCE {source_name};"
43        result = c.sql_query(statement)
44        sql = result[0][1]
45        assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}"
46        assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}"
47
48        if not source_name.endswith("_progress"):
49            assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}"
50
51        print("OK.")
52    except Exception as e:
53        print(f"source-table-migration issue in {file}: {str(e)}")
54
55        if fail:
56            raise e
57
58
59_last_version: MzVersion | None = None
60
61
62def get_old_image_for_source_table_migration_test() -> str:
63    global _last_version
64    if _last_version is None:
65        current_version = MzVersion.parse_cargo()
66        minor_versions = [
67            v
68            for v in get_published_minor_mz_versions(
69                limit=4, exclude_current_minor_version=True
70            )
71            if v < current_version
72        ]
73        _last_version = minor_versions[0]
74    return f"materialize/materialized:{_last_version}"
75
76
77def get_new_image_for_source_table_migration_test() -> str | None:
78    return None
def verify_sources_after_source_table_migration( c: materialize.mzcompose.composition.Composition, file: str, fail: bool = False) -> None:
17def verify_sources_after_source_table_migration(
18    c: Composition, file: str, fail: bool = False
19) -> None:
20    source_names_rows = c.sql_query(
21        "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';"
22    )
23    source_names = [row[0] for row in source_names_rows]
24
25    print(f"Sources created in {file} are: {source_names}")
26
27    c.sql("SET statement_timeout = '20s'")
28
29    for source_name in source_names:
30        _verify_source(c, file, source_name, fail=fail)
def get_old_image_for_source_table_migration_test() -> str:
63def get_old_image_for_source_table_migration_test() -> str:
64    global _last_version
65    if _last_version is None:
66        current_version = MzVersion.parse_cargo()
67        minor_versions = [
68            v
69            for v in get_published_minor_mz_versions(
70                limit=4, exclude_current_minor_version=True
71            )
72            if v < current_version
73        ]
74        _last_version = minor_versions[0]
75    return f"materialize/materialized:{_last_version}"
def get_new_image_for_source_table_migration_test() -> str | None:
78def get_new_image_for_source_table_migration_test() -> str | None:
79    return None