misc.python.materialize.source_table_migration

Utilities for testing the source table migration

 1# Copyright Materialize, Inc. and contributors. All rights reserved.
 2#
 3# Use of this software is governed by the Business Source License
 4# included in the LICENSE file at the root of this repository.
 5#
 6# As of the Change Date specified in that file, in accordance with
 7# the Business Source License, use of this software will be governed
 8# by the Apache License, Version 2.0.
 9
10"""Utilities for testing the source table migration"""
11
12from materialize.mzcompose.composition import Composition
13
14
15def verify_sources_after_source_table_migration(
16    c: Composition,
17    file: str,
18    fail: bool = False,
19    service: str | None = None,
20) -> None:
21    source_names_rows = c.sql_query(
22        "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';",
23        service=service,
24    )
25    source_names = [row[0] for row in source_names_rows]
26
27    print(f"Sources created in {file} are: {source_names}")
28
29    c.sql("SET statement_timeout = '20s'", service=service)
30
31    for source_name in source_names:
32        _verify_source(c, file, source_name, fail=fail, service=service)
33
34
35def _verify_source(
36    c: Composition,
37    file: str,
38    source_name: str,
39    fail: bool = False,
40    service: str | None = None,
41) -> None:
42    try:
43        print(f"Checking source: {source_name}")
44
45        statement = f"SHOW CREATE SOURCE {source_name};"
46        result = c.sql_query(statement, service=service)
47        sql = result[0][1]
48        assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}"
49        assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}"
50
51        if not source_name.endswith("_progress"):
52            assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}"
53
54        print("OK.")
55    except Exception as e:
56        print(f"source-table-migration issue in {file}: {str(e)}")
57
58        if fail:
59            raise e
def verify_sources_after_source_table_migration( c: materialize.mzcompose.composition.Composition, file: str, fail: bool = False, service: str | None = None) -> None:
16def verify_sources_after_source_table_migration(
17    c: Composition,
18    file: str,
19    fail: bool = False,
20    service: str | None = None,
21) -> None:
22    source_names_rows = c.sql_query(
23        "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';",
24        service=service,
25    )
26    source_names = [row[0] for row in source_names_rows]
27
28    print(f"Sources created in {file} are: {source_names}")
29
30    c.sql("SET statement_timeout = '20s'", service=service)
31
32    for source_name in source_names:
33        _verify_source(c, file, source_name, fail=fail, service=service)