misc.python.materialize.source_table_migration

Utilities for testing the source table migration

 1# Copyright Materialize, Inc. and contributors. All rights reserved.
 2#
 3# Use of this software is governed by the Business Source License
 4# included in the LICENSE file at the root of this repository.
 5#
 6# As of the Change Date specified in that file, in accordance with
 7# the Business Source License, use of this software will be governed
 8# by the Apache License, Version 2.0.
 9
10"""Utilities for testing the source table migration"""
11from materialize.mzcompose.composition import Composition
12
13
14def verify_sources_after_source_table_migration(
15    c: Composition,
16    file: str,
17    fail: bool = False,
18    service: str | None = None,
19) -> None:
20    source_names_rows = c.sql_query(
21        "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';",
22        service=service,
23    )
24    source_names = [row[0] for row in source_names_rows]
25
26    print(f"Sources created in {file} are: {source_names}")
27
28    c.sql("SET statement_timeout = '20s'", service=service)
29
30    for source_name in source_names:
31        _verify_source(c, file, source_name, fail=fail, service=service)
32
33
34def _verify_source(
35    c: Composition,
36    file: str,
37    source_name: str,
38    fail: bool = False,
39    service: str | None = None,
40) -> None:
41    try:
42        print(f"Checking source: {source_name}")
43
44        statement = f"SHOW CREATE SOURCE {source_name};"
45        result = c.sql_query(statement, service=service)
46        sql = result[0][1]
47        assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}"
48        assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}"
49
50        if not source_name.endswith("_progress"):
51            assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}"
52
53        print("OK.")
54    except Exception as e:
55        print(f"source-table-migration issue in {file}: {str(e)}")
56
57        if fail:
58            raise e
def verify_sources_after_source_table_migration( c: materialize.mzcompose.composition.Composition, file: str, fail: bool = False, service: str | None = None) -> None:
15def verify_sources_after_source_table_migration(
16    c: Composition,
17    file: str,
18    fail: bool = False,
19    service: str | None = None,
20) -> None:
21    source_names_rows = c.sql_query(
22        "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';",
23        service=service,
24    )
25    source_names = [row[0] for row in source_names_rows]
26
27    print(f"Sources created in {file} are: {source_names}")
28
29    c.sql("SET statement_timeout = '20s'", service=service)
30
31    for source_name in source_names:
32        _verify_source(c, file, source_name, fail=fail, service=service)