Module materialize.feature_benchmark.scenarios.benchmark_main

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
import re
from math import ceil, floor
from textwrap import dedent

from parameterized import parameterized_class  # type: ignore

from materialize.feature_benchmark.action import Action, Kgen, TdAction
from materialize.feature_benchmark.measurement_source import (
    Lambda,
    MeasurementSource,
    Td,
)
from materialize.feature_benchmark.scenario import (
    BenchmarkingSequence,
    Scenario,
    ScenarioBig,
    ScenarioDisabled,
)
from materialize.mz_version import MzVersion

# for pdoc ignores
__pdoc__ = {}


class FastPath(Scenario):
    """Feature benchmarks related to the "fast path" in query execution, as described in the
    'Internals of One-off Queries' presentation.
    """


class FastPathFilterNoIndex(FastPath):
    """Measure the time it takes for the fast path to filter our all rows from a materialized view and return"""

    SCALE = 7
    FIXED_SCALE = True  # OOM with 10**8 = 100M records

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()});

> CREATE DEFAULT INDEX ON v1;

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> /* A */ SELECT 1;
1
> /* B */ SELECT * FROM v1 WHERE f2 < 0;
"""
        )


class MFPPushdown(Scenario):
    """Test MFP pushdown -- WHERE clause with a suitable condition and no index defined."""

    SCALE = 7
    FIXED_SCALE = True  # OOM with 10**8 = 100M records

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()});

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> /* A */ SELECT 1;
1
> /* B */ SELECT * FROM v1 WHERE f2 < 0;
"""
        )


class FastPathFilterIndex(FastPath):
    """Measure the time it takes for the fast path to filter our all rows from a materialized view using an index and return"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}

> CREATE DEFAULT INDEX ON v1;

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    # Since an individual query of this particular type being benchmarked takes 1ms to execute, the results are susceptible
    # to a lot of random noise. As we can not make the query any slower by using e.g. a large dataset,
    # we run the query 100 times in a row and measure the total execution time.

    def benchmark(self) -> MeasurementSource:
        hundred_selects = "\n".join(
            "> SELECT * FROM v1 WHERE f1 = 1;\n1\n" for i in range(0, 1000)
        )

        return Td(
            f"""
> SET auto_route_introspection_queries TO false

> BEGIN

> SELECT 1;
  /* A */
1

{hundred_selects}

> SELECT 1
  /* B */
1
"""
        )


class FastPathOrderByLimit(FastPath):
    """Benchmark the case SELECT * FROM materialized_view ORDER BY <key> LIMIT <i>"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};

> CREATE DEFAULT INDEX ON v1;

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> SELECT 1;
  /* A */
1
> SELECT f1 FROM v1 ORDER BY f1 DESC LIMIT 1000
  /* B */
"""
            + "\n".join([str(x) for x in range(self.n() - 1000, self.n())])
        )


class FastPathLimit(FastPath):
    """Benchmark the case SELECT * FROM source LIMIT <i> , optimized by #21615"""

    def init(self) -> list[Action]:
        return [
            TdAction(
                f"""
                > CREATE MATERIALIZED VIEW v1 AS SELECT * FROM generate_series(1, {self.n()})
                """
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            dedent(
                """
                > SELECT 1;
                  /* A */
                1
                > SELECT * FROM v1 LIMIT 100
                  /* B */
                """
            )
            + "\n".join([str(x) for x in range(1, 101)])
        )


class DML(Scenario):
    """Benchmarks around the performance of DML statements"""

    pass


class Insert(DML):
    """Measure the time it takes for an INSERT statement to return."""

    def init(self) -> Action:
        return self.table_ten()

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

> INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}
  /* B */
"""
        )


class ManySmallInserts(DML):
    """Measure the time it takes for several small INSERT statements to return."""

    def init(self) -> Action:
        return self.table_ten()

    def benchmark(self) -> MeasurementSource:
        random.seed(self.seed())

        statements = []
        for _ in range(0, 10000):
            statements.append(f"> INSERT INTO t1 VALUES ({random.randint(0, 100000)})")

        insert_statements_str = "\n".join(statements)

        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

{insert_statements_str}
  /* B */
"""
        )


class InsertBatch(DML):
    """Measure the time it takes for a batch of INSERT statements to return."""

    SCALE = 4

    def benchmark(self) -> MeasurementSource:
        inserts = "\n".join(
            f"> INSERT INTO t1 VALUES ({i});" for i in range(0, self.n())
        )

        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

> SET auto_route_introspection_queries TO false

> BEGIN

{inserts}

> COMMIT
  /* B */
"""
        )


class InsertMultiRow(DML):
    """Measure the time it takes for a single multi-row INSERT statement to return."""

    SCALE = 5

    def benchmark(self) -> MeasurementSource:
        values = ", ".join(f"({i})" for i in range(0, self.n()))

        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

> INSERT INTO t1 VALUES {values}
  /* B */
"""
        )


class Update(DML):
    """Measure the time it takes for an UPDATE statement to return to client"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE TABLE t1 (f1 BIGINT);

> CREATE DEFAULT INDEX ON t1;

> INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> UPDATE t1 SET f1 = f1 + {self.n()}
  /* B */
"""
        )


class ManySmallUpdates(DML):
    """Measure the time it takes for several small UPDATE statements to return to client"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                """
> CREATE TABLE t1 (f1 INT, f2 INT);

> CREATE DEFAULT INDEX ON t1;

> INSERT INTO t1 SELECT generate_series(1, 10);
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        random.seed(self.seed())

        statements = []
        for _ in range(0, 10000):
            statements.append(
                f"> UPDATE t1 SET f1 = {random.randint(0, 100000)}, f2 = {random.randint(0, 100000)} WHERE f1 % 10 = {random.randint(0, 10)}"
            )

        update_statements_str = "\n".join(statements)

        return Td(
            f"""
> SELECT 1
  /* A */
1

{update_statements_str}
  /* B */
"""
        )


class UpdateMultiNoIndex(DML):
    """Measure the time it takes to perform multiple updates over the same records in a non-indexed table. GitHub Issue #11071"""

    def before(self) -> Action:
        # Due to exterme variability in the results, we have no option but to drop and re-create
        # the table prior to each measurement
        return TdAction(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 BIGINT);

> INSERT INTO t1 SELECT * FROM generate_series(0, {self.n()})
"""
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> UPDATE t1 SET f1 = f1 + {self.n()}

> SELECT COUNT(*) FROM t1 WHERE f1 > {self.n()}
  /* B */
{self.n()}
"""
        )


class InsertAndSelect(DML):
    """Measure the time it takes for an INSERT statement to return
    AND for a follow-up SELECT to return data, that is, for the
    dataflow to be completely caught up.
    """

    def init(self) -> Action:
        return self.table_ten()

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

> INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()};

> SELECT 1 FROM t1 WHERE f1 = 1
  /* B */
1
"""
        )


class Dataflow(Scenario):
    """Benchmark scenarios around individual dataflow patterns/operators"""

    pass


class OrderBy(Dataflow):
    """Benchmark ORDER BY as executed by the dataflow layer,
    in contrast with an ORDER BY executed using a Finish step in the coordinator"""

    def init(self) -> Action:
        # Just to spice things up a bit, we perform individual
        # inserts here so that the rows are assigned separate timestamps
        inserts = "\n\n".join(f"> INSERT INTO ten VALUES ({i})" for i in range(0, 10))

        return TdAction(
            f"""
> CREATE TABLE ten (f1 INTEGER);

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};

{inserts}

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
        )

    def benchmark(self) -> MeasurementSource:
        # Explicit LIMIT is needed for the ORDER BY to not be optimized away
        return Td(
            f"""
> DROP MATERIALIZED VIEW IF EXISTS v2
  /* A */

> CREATE MATERIALIZED VIEW v2 AS SELECT * FROM v1 ORDER BY f1 LIMIT 999999999999

> SELECT COUNT(*) FROM v2
  /* B */
{self.n()}
"""
        )


class CountDistinct(Dataflow):
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()};

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> SELECT COUNT(DISTINCT f1) AS f1 FROM v1
  /* B */
{self.n()}
"""
        )


class MinMax(Dataflow):
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> SELECT MIN(f1), MAX(f1) AS f1 FROM v1
  /* B */
0 {self.n()-1}
"""
        )


class MinMaxMaintained(Dataflow):
    """Benchmark MinMax as an indexed view, which renders a dataflow for incremental
    maintenance, in contrast with one-shot SELECT processing"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP VIEW IF EXISTS v2
  /* A */

> CREATE VIEW v2 AS SELECT MIN(f1), MAX(f1) AS f1 FROM v1

> CREATE DEFAULT INDEX ON v2

> SELECT * FROM v2
  /* B */
0 {self.n()-1}
"""
        )


class GroupBy(Dataflow):
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}

> SELECT COUNT(*) = {self.n()} FROM v1
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2)
  /* B */
{self.n()} 0 {self.n()-1}
"""
        )


class GroupByMaintained(Dataflow):
    """Benchmark GroupBy as an indexed view, which renders a dataflow for incremental
    maintenance, in contrast with one-shot SELECT processing"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}

> SELECT COUNT(*) = {self.n()} FROM v1
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP VIEW IF EXISTS v2;
  /* A */

> CREATE VIEW v2 AS SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2)

> CREATE DEFAULT INDEX ON v2

> SELECT * FROM v2
  /* B */
{self.n()} 0 {self.n()-1}
"""
        )


class CrossJoin(Dataflow):
    def init(self) -> Action:
        return self.view_ten()

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP MATERIALIZED VIEW IF EXISTS v1;

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()}
  /* A */

> SELECT COUNT(*) = {self.n()} AS f1 FROM v1;
  /* B */
true
"""
        )


class AccumulateReductions(Dataflow):
    """Benchmark the accumulation of reductions."""

    def before(self) -> Action:
        return TdAction(
            """
> DROP TABLE IF EXISTS t CASCADE;
> CREATE TABLE t (a int, b int, c int, d int);

> CREATE MATERIALIZED VIEW data AS
  SELECT a, a AS b FROM generate_series(1, 10000000) AS a
  UNION ALL
  SELECT a, b FROM t;

> INSERT INTO t (a, b) VALUES (1, 1);
> INSERT INTO t (a, b) VALUES (0, 0);

> DROP CLUSTER IF EXISTS idx_cluster CASCADE;
> CREATE CLUSTER idx_cluster SIZE '1-8G', REPLICATION FACTOR 1;

> CREATE VIEW accumulable AS
  SELECT
    a,
    sum(a) AS sum_a, count(a) as cnt_a,
    sum(b) AS sum_b, count(b) as cnt_b
  FROM data
  GROUP BY a;
"""
        )

    def benchmark(self) -> MeasurementSource:
        sql = """
> SELECT 1
  /* A */
1

> CREATE INDEX i_accumulable IN CLUSTER idx_cluster ON accumulable(a);

> SET CLUSTER = idx_cluster;

? EXPLAIN SELECT count(*) FROM accumulable;
Explained Query:
  Return // { arity: 1 }
    Union // { arity: 1 }
      Get l0 // { arity: 1 }
      Map (0) // { arity: 1 }
        Union // { arity: 0 }
          Negate // { arity: 0 }
            Project () // { arity: 0 }
              Get l0 // { arity: 1 }
          Constant // { arity: 0 }
            - ()
  With
    cte l0 =
      Reduce aggregates=[count(*)] // { arity: 1 }
        Project () // { arity: 0 }
          ReadIndex on=accumulable i_accumulable=[*** full scan ***] // { arity: 5 }

Used Indexes:
  - materialize.public.i_accumulable (*** full scan ***)

Target cluster: idx_cluster

> SELECT count(*) FROM accumulable;
  /* B */
10000001

> SET CLUSTER = default;
"""

        if self._mz_version < MzVersion.parse_mz("v0.83.0-dev"):
            sql = remove_arity_information_from_explain(sql)

        if self._mz_version < MzVersion.parse_mz("v0.96.0-dev"):
            sql = remove_target_cluster_from_explain(sql)

        return Td(sql)


class Retraction(Dataflow):
    """Benchmark the time it takes to process a very large retraction"""

    def before(self) -> Action:
        return TdAction(
            f"""
> DROP TABLE IF EXISTS ten CASCADE;

> CREATE TABLE ten (f1 INTEGER);

> INSERT INTO ten VALUES (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()}

> SELECT COUNT(*) = {self.n()} AS f1 FROM v1;
true
"""
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> SELECT 1
  /* A */
1

> DELETE FROM ten;

> SELECT COUNT(*) FROM v1
  /* B */
0
"""
        )


class CreateIndex(Dataflow):
    """Measure the time it takes for CREATE INDEX to return *plus* the time
    it takes for a SELECT query that would use the index to return rows.
    """

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE TABLE t1 (f1 INTEGER, f2 INTEGER);
> INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}

# Make sure the dataflow is fully hydrated
> SELECT 1 FROM t1 WHERE f1 = 0;
1
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> DROP INDEX IF EXISTS i1;
  /* A */

> CREATE INDEX i1 ON t1(f1);

> SELECT COUNT(*)
  FROM t1 AS a1, t1 AS a2
  WHERE a1.f1 = a2.f1
  AND a1.f1 = 0
  AND a2.f1 = 0
  /* B */
1
"""
        )


class DeltaJoin(Dataflow):
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1;
  /* A */
1

# Delta joins require 3+ tables
> SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1
  /* B */
{self.n()}
"""
        )


class DeltaJoinMaintained(Dataflow):
    """Benchmark DeltaJoin as an indexed view with table-based data initialization, where the
    empty frontier is not emitted, in contrast with one-shot SELECT processing based on data
    initialized as a constant view"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP VIEW IF EXISTS v2;
  /* A */

# Delta joins require 3+ tables
> CREATE VIEW v2 AS SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1

> CREATE DEFAULT INDEX ON v2

> SELECT * FROM v2
  /* B */
{self.n()}
"""
        )


class DifferentialJoin(Dataflow):
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1;
  /* A */
1


> SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)
  /* B */
{self.n()}
"""
        )


class FullOuterJoin(Dataflow):
    def benchmark(self) -> BenchmarkingSequence:
        columns_select = ", ".join(
            [f"a{i+1}.f1 AS f{i+1}" for i in range(0, floor(self.scale()))]
        )
        columns_using = ", ".join([f"f{i+1}" for i in range(0, floor(self.scale()))])
        inserts = "\n".join([f"> INSERT INTO ten VALUES ({i+1})" for i in range(0, 10)])

        return [
            Td(
                f"""
> DROP MATERIALIZED VIEW IF EXISTS v2 CASCADE;

> DROP MATERIALIZED VIEW IF EXISTS v1 CASCADE;

> DROP TABLE IF EXISTS ten;

> CREATE TABLE ten (f1 INTEGER);

> CREATE MATERIALIZED VIEW v1 AS SELECT {columns_select} FROM {self.join()}
> SELECT 1;
  /* A */
1

> CREATE MATERIALIZED VIEW v2 AS
  SELECT COUNT(a1.f1) AS c1, COUNT(a2.f1) AS c2
  FROM v1 AS a1
  FULL OUTER JOIN v1 AS a2 USING ({columns_using});

{inserts}

> SELECT * FROM v2;
  /* B */
{self.n()} {self.n()}
"""
            )
        ]


class Finish(Scenario):
    """Benchmarks around te Finish stage of query processing"""


class FinishOrderByLimit(Finish):
    """Benchmark ORDER BY + LIMIT without the benefit of an index"""

    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> SELECT f2 FROM v1 ORDER BY 1 DESC LIMIT 1
  /* B */
{self.n()-1}
"""
        )


class Kafka(Scenario):
    pass


class KafkaEnvelopeNoneBytes(Kafka):
    def shared(self) -> Action:
        data = "a" * 512
        return TdAction(
            f"""
$ kafka-create-topic topic=kafka-envelope-none-bytes

$ kafka-ingest format=bytes topic=kafka-envelope-none-bytes repeat={self.n()}
{data}
"""
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-envelope-none-bytes-${{testdrive.seed}}')
  FORMAT BYTES
  ENVELOPE NONE
  /* A */

> SELECT COUNT(*) = {self.n()} FROM s1
  /* B */
true
"""
        )


class KafkaUpsert(Kafka):
    def shared(self) -> Action:
        return TdAction(
            self.keyschema()
            + self.schema()
            + f"""
$ kafka-create-topic topic=kafka-upsert

$ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": 1}} {{"f2": ${{kafka-ingest.iteration}} }}

$ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}}
{{"f1": 2}} {{"f2": 2}}
"""
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
    URL '${{testdrive.schema-registry-url}}'
  );

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-upsert-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT
  /* A */

> SELECT f1 FROM s1
  /* B */
1
2
"""
        )


class KafkaUpsertUnique(Kafka):
    def shared(self) -> Action:
        return TdAction(
            self.keyschema()
            + self.schema()
            + f"""
$ kafka-create-topic topic=upsert-unique partitions=16

$ kafka-ingest format=avro topic=upsert-unique key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CONNECTION IF EXISTS s1_csr_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
  /* A */

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-upsert-unique-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  ENVELOPE UPSERT

> SELECT COUNT(*) FROM s1;
  /* B */
{self.n()}
"""
        )


class KafkaRestart(ScenarioDisabled):
    """This scenario dates from the pre-persistence era where the entire topic was re-ingested from scratch.
    With presistence however, no reingestion takes place and the scenario exhibits extreme variability.
    Instead of re-ingestion, we are measuring mostly the speed of COUNT(*), further obscured by
    the one second timestamp granularity
    """

    def shared(self) -> Action:
        return TdAction(
            self.keyschema()
            + self.schema()
            + f"""
$ kafka-create-topic topic=kafka-recovery partitions=8

$ kafka-ingest format=avro topic=kafka-recovery key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
        )

    def init(self) -> Action:
        return TdAction(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CONNECTION IF EXISTS s1_csr_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  ENVELOPE UPSERT;

# Make sure we are fully caught up before continuing
> SELECT COUNT(*) FROM s1;
{self.n()}

# Give time for any background tasks (e.g. compaction) to settle down
> SELECT mz_unsafe.mz_sleep(10)
<null>
"""
        )

    def benchmark(self) -> BenchmarkingSequence:
        return [
            Lambda(lambda e: e.RestartMz()),
            Td(
                f"""
> SELECT COUNT(*) /* {self.n()} */ FROM s1;
  /* B */
{self.n()}
"""
            ),
        ]


class KafkaRestartBig(ScenarioBig):
    """Ingest 100M records without constructing
    a dataflow that would keep all of them in memory. For the purpose, we
    emit a bunch of "EOF" records after the primary ingestion is complete
    and consider that the source has caught up when all the EOF records have
    been seen.
    """

    SCALE = 8

    def shared(self) -> list[Action]:
        return [
            TdAction("$ kafka-create-topic topic=kafka-recovery-big partitions=8"),
            # Ingest 10 ** SCALE records
            Kgen(
                topic="kafka-recovery-big",
                args=[
                    "--keys=random",
                    f"--num-records={self.n()}",
                    "--values=bytes",
                    "--max-message-size=32",
                    "--min-message-size=32",
                    "--key-min=256",
                    f"--key-max={256+(self.n()**2)}",
                ],
            ),
            # Add 256 EOF markers with key values <= 256.
            # This high number is chosen as to guarantee that there will be an EOF marker
            # in each partition, even if the number of partitions is increased in the future.
            Kgen(
                topic="kafka-recovery-big",
                args=[
                    "--keys=sequential",
                    "--num-records=256",
                    "--values=bytes",
                    "--min-message-size=32",
                    "--max-message-size=32",
                ],
            ),
        ]

    def init(self) -> Action:
        return TdAction(
            f"""
>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> DROP CLUSTER IF EXISTS source_cluster CASCADE
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-big-${{testdrive.seed}}')
  KEY FORMAT BYTES
  VALUE FORMAT BYTES
  ENVELOPE UPSERT;

# Confirm that all the EOF markers generated above have been processed
> CREATE MATERIALIZED VIEW s1_is_complete AS SELECT COUNT(*) = 256 FROM s1 WHERE key <= '\\x00000000000000ff'

> SELECT * FROM s1_is_complete;
true
"""
        )

    def benchmark(self) -> BenchmarkingSequence:
        return [
            Lambda(lambda e: e.RestartMz()),
            Td(
                """
> SELECT * FROM s1_is_complete
  /* B */
true
"""
            ),
        ]


for i in [5, 6, 7, 8, 9]:
    __pdoc__[f"KafkaEnvelopeNoneBytesScalability_scale_{i}"] = False


@parameterized_class(
    [{"SCALE": i} for i in [5, 6, 7, 8, 9]], class_name_func=Scenario.name_with_scale
)
class KafkaEnvelopeNoneBytesScalability(ScenarioBig):
    """Run the same scenario across different scales. Do not materialize the entire
    source but rather just a non-memory-consuming view on top of it.
    """

    def shared(self) -> list[Action]:
        return [
            TdAction(
                """
$ kafka-create-topic topic=kafka-scalability partitions=8
"""
            ),
            Kgen(
                topic="kafka-scalability",
                args=[
                    "--keys=sequential",
                    f"--num-records={self.n()}",
                    "--values=bytes",
                    "--max-message-size=100",
                    "--min-message-size=100",
                ],
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-scalability-${{testdrive.seed}}')
  KEY FORMAT BYTES
  VALUE FORMAT BYTES
  ENVELOPE NONE
  /* A */

> CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) AS c FROM s1;

> SELECT c = {self.n()} FROM v1
  /* B */
true
"""
        )


class Sink(Scenario):
    pass


class ExactlyOnce(Sink):
    """Measure the time it takes to emit 1M records to a reuse_topic=true sink. As we have limited
    means to figure out when the complete output has been emited, we have no option but to re-ingest
    the data to determine completion.
    """

    def shared(self) -> Action:
        return TdAction(
            self.keyschema()
            + self.schema()
            + f"""
$ kafka-create-topic topic=sink-input partitions=16

$ kafka-ingest format=avro topic=sink-input key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
        )

    def init(self) -> Action:
        return TdAction(
            f"""
>[version<7800]  CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> DROP CLUSTER IF EXISTS source_cluster CASCADE

> CREATE CONNECTION IF NOT EXISTS csr_conn
  FOR CONFLUENT SCHEMA REGISTRY
  URL '${{testdrive.schema-registry-url}}';

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE source1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-input-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT;

> SELECT COUNT(*) FROM source1;
{self.n()}
"""
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP SINK IF EXISTS sink1;
> DROP SOURCE IF EXISTS sink1_check CASCADE;
  /* A */

> DROP CLUSTER IF EXISTS sink_cluster CASCADE
> CREATE CLUSTER sink_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SINK sink1
  IN CLUSTER sink_cluster
  FROM source1
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
  KEY (f1)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE DEBEZIUM

$ kafka-verify-topic sink=materialize.public.sink1 await-value-schema=true await-key-schema=true

# Wait until all the records have been emited from the sink, as observed by the sink1_check source

> CREATE SOURCE sink1_check
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
  KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT;

> CREATE MATERIALIZED VIEW sink1_check_v AS SELECT COUNT(*) FROM sink1_check;

> SELECT * FROM sink1_check_v
  /* B */
"""
            + str(self.n())
        )


class PgCdc(Scenario):
    pass


class PgCdcInitialLoad(PgCdc):
    """Measure the time it takes to read 1M existing records from Postgres
    when creating a materialized source"""

    def shared(self) -> Action:
        return TdAction(
            f"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER USER postgres WITH replication;
DROP SCHEMA IF EXISTS public CASCADE;
CREATE SCHEMA public;

DROP PUBLICATION IF EXISTS mz_source;
CREATE PUBLICATION mz_source FOR ALL TABLES;

CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
INSERT INTO pk_table SELECT x, x*2 FROM generate_series(1, {self.n()}) as x;
ALTER TABLE pk_table REPLICA IDENTITY FULL;
"""
        )

    def before(self) -> Action:
        return TdAction(
            """
> DROP SOURCE IF EXISTS mz_source_pgcdc CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE
            """
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'

> CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES (
    HOST postgres,
    DATABASE postgres,
    USER postgres,
    PASSWORD SECRET pgpass
  )

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE mz_source_pgcdc
  IN CLUSTER source_cluster
  FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'mz_source')
  FOR TABLES ("pk_table")
  /* A */

> SELECT count(*) FROM pk_table
  /* B */
{self.n()}
            """
        )


class PgCdcStreaming(PgCdc):
    """Measure the time it takes to ingest records from Postgres post-snapshot"""

    SCALE = 5

    def shared(self) -> Action:
        return TdAction(
            """
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER USER postgres WITH replication;
DROP SCHEMA IF EXISTS public CASCADE;
CREATE SCHEMA public;

DROP PUBLICATION IF EXISTS p1;
CREATE PUBLICATION p1 FOR ALL TABLES;
"""
        )

    def before(self) -> Action:
        return TdAction(
            f"""
> DROP SOURCE IF EXISTS s1 CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE;

$ postgres-execute connection=postgres://postgres:postgres@postgres
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT);
ALTER TABLE t1 REPLICA IDENTITY FULL;

> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'

> CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES (
    HOST postgres,
    DATABASE postgres,
    USER postgres,
    PASSWORD SECRET pgpass
  )

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'p1')
  FOR TABLES ("t1")
            """
        )

    def benchmark(self) -> MeasurementSource:
        insertions = "\n".join(
            [
                f"INSERT INTO t1 (f2) SELECT x FROM generate_series(1, {self.n()/1000}) as x;\nCOMMIT;"
                for i in range(0, 1000)
            ]
        )

        return Td(
            f"""
> SELECT 1;
  /* A */
1

$ postgres-execute connection=postgres://postgres:postgres@postgres
{insertions}

> SELECT count(*) FROM t1
  /* B */
{self.n()}
            """
        )


class MySqlCdc(Scenario):
    pass


class MySqlInitialLoad(MySqlCdc):
    """Measure the time it takes to read 1M existing records from MySQL
    when creating a materialized source"""

    FIXED_SCALE = True  # TODO: Remove when #25323 is fixed

    def shared(self) -> Action:
        return TdAction(
            f"""
$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;

SET @i:=0;
CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
INSERT INTO pk_table SELECT @i:=@i+1, @i*@i FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.n()};
"""
        )

    def before(self) -> Action:
        return TdAction(
            """
> DROP SOURCE IF EXISTS mz_source_mysqlcdc CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE
            """
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
> CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
    HOST mysql,
    USER root,
    PASSWORD SECRET mysqlpass
  )

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE mz_source_mysqlcdc
  IN CLUSTER source_cluster
  FROM MYSQL CONNECTION mysql_conn
  FOR TABLES (public.pk_table)
  /* A */

> SELECT count(*) FROM pk_table
  /* B */
{self.n()}
            """
        )


class MySqlStreaming(MySqlCdc):
    """Measure the time it takes to ingest records from MySQL post-snapshot"""

    SCALE = 5

    @classmethod
    def can_run(cls, version: MzVersion) -> bool:
        return version >= MzVersion.parse_mz("v0.88.0-dev")

    def shared(self) -> Action:
        return TdAction(
            """
$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
"""
        )

    def before(self) -> Action:
        return TdAction(
            f"""
> DROP SOURCE IF EXISTS s1 CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE;

$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT);

> CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
> CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
    HOST mysql,
    USER root,
    PASSWORD SECRET mysqlpass
  )

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM MYSQL CONNECTION mysql_conn
  FOR TABLES (public.t1)
            """
        )

    def benchmark(self) -> MeasurementSource:
        insertions = "\n".join(
            [
                dedent(
                    f"""
                    SET @i:=0;
                    INSERT INTO t1 (f2) SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {round(self.n()/1000)};
                    COMMIT;
                    """
                )
                for i in range(0, 1000)
            ]
        )

        return Td(
            f"""
> SELECT 1;
  /* A */
1

$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}

$ mysql-execute name=mysql
USE public;
{insertions}

> SELECT count(*) FROM t1
  /* B */
{self.n()}
            """
        )


class Coordinator(Scenario):
    """Feature benchmarks pertaining to the coordinator."""


class QueryLatency(Coordinator):
    SCALE = 3
    """Measure the time it takes to run SELECT 1 queries"""

    def benchmark(self) -> MeasurementSource:
        selects = "\n".join("> SELECT 1\n1\n" for i in range(0, self.n()))

        return Td(
            f"""
> SET auto_route_introspection_queries TO false

> BEGIN

> SELECT 1;
  /* A */
1

{selects}

> SELECT 1;
  /* B */
1
"""
        )


class ConnectionLatency(Coordinator):
    """Measure the time it takes to establish connections to Mz"""

    SCALE = 2  # Many connections * many measurements = TCP port exhaustion

    def benchmark(self) -> MeasurementSource:
        connections = "\n".join(
            """
$ postgres-execute connection=postgres://materialize:materialize@${testdrive.materialize-sql-addr}
SELECT 1;
"""
            for i in range(0, self.n())
        )

        return Td(
            f"""
> SET auto_route_introspection_queries TO false

> BEGIN

> SELECT 1;
  /* A */
1

{connections}

> SELECT 1;
  /* B */
1
"""
        )


class Startup(Scenario):
    pass


class StartupEmpty(Startup):
    """Measure the time it takes to restart an empty Mz instance."""

    def benchmark(self) -> BenchmarkingSequence:
        return [
            Lambda(lambda e: e.RestartMz()),
            Td(
                """
> SELECT 1;
  /* B */
1
"""
            ),
        ]


class StartupLoaded(Scenario):
    """Measure the time it takes to restart a populated Mz instance and have all the dataflows be ready to return something"""

    SCALE = 1.2  # 25 objects of each kind
    FIXED_SCALE = (
        True  # Can not scale to 100s of objects, so --size=+N will have no effect
    )

    def shared(self) -> Action:
        return TdAction(
            self.schema()
            + """
$ kafka-create-topic topic=startup-time

$ kafka-ingest format=avro topic=startup-time schema=${schema} repeat=1
{"f2": 1}
"""
        )

    def init(self) -> Action:
        create_tables = "\n".join(
            f"> CREATE TABLE t{i} (f1 INTEGER);\n> INSERT INTO t{i} DEFAULT VALUES;"
            for i in range(0, self.n())
        )
        create_sources = "\n".join(
            f"""
> DROP CLUSTER IF EXISTS source{i}_cluster CASCADE;
> CREATE CLUSTER source{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE source{i}
  IN CLUSTER source{i}_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-startup-time-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  ENVELOPE NONE
"""
            for i in range(0, self.n())
        )
        join = " ".join(
            f"LEFT JOIN source{i} USING (f2)" for i in range(1, (ceil(self.scale())))
        )

        create_views = "\n".join(
            f"> CREATE MATERIALIZED VIEW v{i} AS SELECT * FROM source{i} AS s {join} LIMIT {i+1}"
            for i in range(0, self.n())
        )

        create_sinks = "\n".join(
            f"""
> DROP CLUSTER IF EXISTS sink{i}_cluster;
> CREATE CLUSTER sink{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SINK sink{i}
  IN CLUSTER sink{i}_cluster
  FROM source{i}
  INTO KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
  KEY (f2)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  ENVELOPE DEBEZIUM
"""
            for i in range(0, self.n())
        )

        return TdAction(
            f"""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET max_objects_per_schema = {self.n() * 10};
ALTER SYSTEM SET max_materialized_views = {self.n() * 2};
ALTER SYSTEM SET max_sources = {self.n() * 2};
ALTER SYSTEM SET max_sinks = {self.n() * 2};
ALTER SYSTEM SET max_tables = {self.n() * 2};
ALTER SYSTEM SET max_clusters = {self.n() * 6};

> DROP OWNED BY materialize CASCADE;

>[version<7800]  CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  FOR CONFLUENT SCHEMA REGISTRY
  URL '${{testdrive.schema-registry-url}}';

{create_tables}
{create_sources}
{create_views}
{create_sinks}
"""
        )

    def benchmark(self) -> BenchmarkingSequence:
        check_tables = "\n".join(
            f"> SELECT COUNT(*) >= 0 FROM t{i}\ntrue" for i in range(0, self.n())
        )
        check_sources = "\n".join(
            f"> SELECT COUNT(*) > 0 FROM source{i}\ntrue" for i in range(0, self.n())
        )
        check_views = "\n".join(
            f"> SELECT COUNT(*) > 0 FROM v{i}\ntrue" for i in range(0, self.n())
        )

        return [
            Lambda(lambda e: e.RestartMz()),
            Td(
                f"""
{check_views}
{check_sources}
{check_tables}
> SELECT 1;
  /* B */
1
"""
            ),
        ]


class HydrateIndex(Scenario):
    """Measure the time it takes for an index to hydrate when a cluster comes online."""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                """
> CREATE CLUSTER idx_cluster SIZE '16', REPLICATION FACTOR 1
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        sql = f"""
> DROP TABLE IF EXISTS t1 CASCADE
> CREATE TABLE t1 (f1 INTEGER, f2 INTEGER)
> ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 0)
> CREATE INDEX i1 IN CLUSTER idx_cluster ON t1(f1)
> INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}
> UPDATE t1 SET f1 = f1 + 100000
> UPDATE t1 SET f1 = f1 + 1000000
> UPDATE t1 SET f1 = f1 + 10000000
> UPDATE t1 SET f1 = f1 + 100000000
> UPDATE t1 SET f1 = f1 + 1000000000
> SELECT 1
  /* A */
1
> ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 1)
> SET CLUSTER = idx_cluster
? EXPLAIN SELECT COUNT(*) FROM t1
Explained Query:
  Return // {{ arity: 1 }}
    Union // {{ arity: 1 }}
      Get l0 // {{ arity: 1 }}
      Map (0) // {{ arity: 1 }}
        Union // {{ arity: 0 }}
          Negate // {{ arity: 0 }}
            Project () // {{ arity: 0 }}
              Get l0 // {{ arity: 1 }}
          Constant // {{ arity: 0 }}
            - ()
  With
    cte l0 =
      Reduce aggregates=[count(*)] // {{ arity: 1 }}
        Project () // {{ arity: 0 }}
          ReadIndex on=t1 i1=[*** full scan ***] // {{ arity: 2 }}

Used Indexes:
  - materialize.public.i1 (*** full scan ***)

Target cluster: idx_cluster

> SELECT COUNT(*) FROM t1
  /* B */
{self._n}
> SET CLUSTER = default
"""

        if self._mz_version < MzVersion.parse_mz("v0.83.0-dev"):
            sql = remove_arity_information_from_explain(sql)

        if self._mz_version < MzVersion.parse_mz("v0.96.0-dev"):
            sql = remove_target_cluster_from_explain(sql)

        return Td(sql)


def remove_arity_information_from_explain(sql: str) -> str:
    return re.sub(r" // { arity: \d+ }", "", sql)


def remove_target_cluster_from_explain(sql: str) -> str:
    return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)


class SwapSchema(Scenario):
    SCALE = 2
    FIXED_SCALE = True

    def init(self) -> list[Action]:
        blue_views_on_table = "\n".join(
            f"> CREATE VIEW blue.v{i} AS SELECT * FROM blue.t1;"
            for i in range(0, self.n())
        )

        green_views_on_table = "\n".join(
            f"> CREATE VIEW green.v{i} AS SELECT * FROM green.t1;"
            for i in range(0, self.n())
        )

        noise_views_on_blue_view = "\n".join(
            f"> CREATE VIEW noise.v{i} AS SELECT * FROM blue.v0;"
            for i in range(0, self.n())
        )

        noise_views_on_noise_view = "\n".join(
            f"> CREATE VIEW noise.extra_v{i} AS SELECT * FROM noise.v0;"
            for i in range(0, self.n())
        )

        return [
            TdAction(
                f"""
> CREATE SCHEMA blue;
> CREATE SCHEMA green;
> CREATE SCHEMA noise;

> CREATE TABLE blue.t1 (a int, b text);
> CREATE TABLE green.t1 (a int, b text);

{blue_views_on_table}
{green_views_on_table}
{noise_views_on_blue_view}
{noise_views_on_noise_view}
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            dedent(
                """
                > SELECT 1;
                  /* A */
                1

                > ALTER SCHEMA blue SWAP WITH green;

                > SELECT 1;
                  /* B */
                1
                """
            )
        )

Functions

def remove_arity_information_from_explain(sql: str) ‑> str
Expand source code Browse git
def remove_arity_information_from_explain(sql: str) -> str:
    return re.sub(r" // { arity: \d+ }", "", sql)
def remove_target_cluster_from_explain(sql: str) ‑> str
Expand source code Browse git
def remove_target_cluster_from_explain(sql: str) -> str:
    return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)

Classes

class AccumulateReductions (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark the accumulation of reductions.

Expand source code Browse git
class AccumulateReductions(Dataflow):
    """Benchmark the accumulation of reductions."""

    def before(self) -> Action:
        return TdAction(
            """
> DROP TABLE IF EXISTS t CASCADE;
> CREATE TABLE t (a int, b int, c int, d int);

> CREATE MATERIALIZED VIEW data AS
  SELECT a, a AS b FROM generate_series(1, 10000000) AS a
  UNION ALL
  SELECT a, b FROM t;

> INSERT INTO t (a, b) VALUES (1, 1);
> INSERT INTO t (a, b) VALUES (0, 0);

> DROP CLUSTER IF EXISTS idx_cluster CASCADE;
> CREATE CLUSTER idx_cluster SIZE '1-8G', REPLICATION FACTOR 1;

> CREATE VIEW accumulable AS
  SELECT
    a,
    sum(a) AS sum_a, count(a) as cnt_a,
    sum(b) AS sum_b, count(b) as cnt_b
  FROM data
  GROUP BY a;
"""
        )

    def benchmark(self) -> MeasurementSource:
        sql = """
> SELECT 1
  /* A */
1

> CREATE INDEX i_accumulable IN CLUSTER idx_cluster ON accumulable(a);

> SET CLUSTER = idx_cluster;

? EXPLAIN SELECT count(*) FROM accumulable;
Explained Query:
  Return // { arity: 1 }
    Union // { arity: 1 }
      Get l0 // { arity: 1 }
      Map (0) // { arity: 1 }
        Union // { arity: 0 }
          Negate // { arity: 0 }
            Project () // { arity: 0 }
              Get l0 // { arity: 1 }
          Constant // { arity: 0 }
            - ()
  With
    cte l0 =
      Reduce aggregates=[count(*)] // { arity: 1 }
        Project () // { arity: 0 }
          ReadIndex on=accumulable i_accumulable=[*** full scan ***] // { arity: 5 }

Used Indexes:
  - materialize.public.i_accumulable (*** full scan ***)

Target cluster: idx_cluster

> SELECT count(*) FROM accumulable;
  /* B */
10000001

> SET CLUSTER = default;
"""

        if self._mz_version < MzVersion.parse_mz("v0.83.0-dev"):
            sql = remove_arity_information_from_explain(sql)

        if self._mz_version < MzVersion.parse_mz("v0.96.0-dev"):
            sql = remove_target_cluster_from_explain(sql)

        return Td(sql)

Ancestors

Methods

def before(self) ‑> Action
Expand source code Browse git
    def before(self) -> Action:
        return TdAction(
            """
> DROP TABLE IF EXISTS t CASCADE;
> CREATE TABLE t (a int, b int, c int, d int);

> CREATE MATERIALIZED VIEW data AS
  SELECT a, a AS b FROM generate_series(1, 10000000) AS a
  UNION ALL
  SELECT a, b FROM t;

> INSERT INTO t (a, b) VALUES (1, 1);
> INSERT INTO t (a, b) VALUES (0, 0);

> DROP CLUSTER IF EXISTS idx_cluster CASCADE;
> CREATE CLUSTER idx_cluster SIZE '1-8G', REPLICATION FACTOR 1;

> CREATE VIEW accumulable AS
  SELECT
    a,
    sum(a) AS sum_a, count(a) as cnt_a,
    sum(b) AS sum_b, count(b) as cnt_b
  FROM data
  GROUP BY a;
"""
        )
def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        sql = """
> SELECT 1
  /* A */
1

> CREATE INDEX i_accumulable IN CLUSTER idx_cluster ON accumulable(a);

> SET CLUSTER = idx_cluster;

? EXPLAIN SELECT count(*) FROM accumulable;
Explained Query:
  Return // { arity: 1 }
    Union // { arity: 1 }
      Get l0 // { arity: 1 }
      Map (0) // { arity: 1 }
        Union // { arity: 0 }
          Negate // { arity: 0 }
            Project () // { arity: 0 }
              Get l0 // { arity: 1 }
          Constant // { arity: 0 }
            - ()
  With
    cte l0 =
      Reduce aggregates=[count(*)] // { arity: 1 }
        Project () // { arity: 0 }
          ReadIndex on=accumulable i_accumulable=[*** full scan ***] // { arity: 5 }

Used Indexes:
  - materialize.public.i_accumulable (*** full scan ***)

Target cluster: idx_cluster

> SELECT count(*) FROM accumulable;
  /* B */
10000001

> SET CLUSTER = default;
"""

        if self._mz_version < MzVersion.parse_mz("v0.83.0-dev"):
            sql = remove_arity_information_from_explain(sql)

        if self._mz_version < MzVersion.parse_mz("v0.96.0-dev"):
            sql = remove_target_cluster_from_explain(sql)

        return Td(sql)

Inherited members

class ConnectionLatency (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes to establish connections to Mz

Expand source code Browse git
class ConnectionLatency(Coordinator):
    """Measure the time it takes to establish connections to Mz"""

    SCALE = 2  # Many connections * many measurements = TCP port exhaustion

    def benchmark(self) -> MeasurementSource:
        connections = "\n".join(
            """
$ postgres-execute connection=postgres://materialize:materialize@${testdrive.materialize-sql-addr}
SELECT 1;
"""
            for i in range(0, self.n())
        )

        return Td(
            f"""
> SET auto_route_introspection_queries TO false

> BEGIN

> SELECT 1;
  /* A */
1

{connections}

> SELECT 1;
  /* B */
1
"""
        )

Ancestors

Class variables

var SCALE : float

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        connections = "\n".join(
            """
$ postgres-execute connection=postgres://materialize:materialize@${testdrive.materialize-sql-addr}
SELECT 1;
"""
            for i in range(0, self.n())
        )

        return Td(
            f"""
> SET auto_route_introspection_queries TO false

> BEGIN

> SELECT 1;
  /* A */
1

{connections}

> SELECT 1;
  /* B */
1
"""
        )

Inherited members

class Coordinator (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Feature benchmarks pertaining to the coordinator.

Expand source code Browse git
class Coordinator(Scenario):
    """Feature benchmarks pertaining to the coordinator."""

Ancestors

Subclasses

Inherited members

class CountDistinct (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark scenarios around individual dataflow patterns/operators

Expand source code Browse git
class CountDistinct(Dataflow):
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()};

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> SELECT COUNT(DISTINCT f1) AS f1 FROM v1
  /* B */
{self.n()}
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> SELECT COUNT(DISTINCT f1) AS f1 FROM v1
  /* B */
{self.n()}
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()};

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

Inherited members

class CreateIndex (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes for CREATE INDEX to return plus the time it takes for a SELECT query that would use the index to return rows.

Expand source code Browse git
class CreateIndex(Dataflow):
    """Measure the time it takes for CREATE INDEX to return *plus* the time
    it takes for a SELECT query that would use the index to return rows.
    """

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE TABLE t1 (f1 INTEGER, f2 INTEGER);
> INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}

# Make sure the dataflow is fully hydrated
> SELECT 1 FROM t1 WHERE f1 = 0;
1
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> DROP INDEX IF EXISTS i1;
  /* A */

> CREATE INDEX i1 ON t1(f1);

> SELECT COUNT(*)
  FROM t1 AS a1, t1 AS a2
  WHERE a1.f1 = a2.f1
  AND a1.f1 = 0
  AND a2.f1 = 0
  /* B */
1
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> DROP INDEX IF EXISTS i1;
  /* A */

> CREATE INDEX i1 ON t1(f1);

> SELECT COUNT(*)
  FROM t1 AS a1, t1 AS a2
  WHERE a1.f1 = a2.f1
  AND a1.f1 = 0
  AND a2.f1 = 0
  /* B */
1
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE TABLE t1 (f1 INTEGER, f2 INTEGER);
> INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}

# Make sure the dataflow is fully hydrated
> SELECT 1 FROM t1 WHERE f1 = 0;
1
"""
            ),
        ]

Inherited members

class CrossJoin (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark scenarios around individual dataflow patterns/operators

Expand source code Browse git
class CrossJoin(Dataflow):
    def init(self) -> Action:
        return self.view_ten()

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP MATERIALIZED VIEW IF EXISTS v1;

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()}
  /* A */

> SELECT COUNT(*) = {self.n()} AS f1 FROM v1;
  /* B */
true
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP MATERIALIZED VIEW IF EXISTS v1;

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()}
  /* A */

> SELECT COUNT(*) = {self.n()} AS f1 FROM v1;
  /* B */
true
"""
        )
def init(self) ‑> Action
Expand source code Browse git
def init(self) -> Action:
    return self.view_ten()

Inherited members

class DML (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmarks around the performance of DML statements

Expand source code Browse git
class DML(Scenario):
    """Benchmarks around the performance of DML statements"""

    pass

Ancestors

Subclasses

Inherited members

class Dataflow (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark scenarios around individual dataflow patterns/operators

Expand source code Browse git
class Dataflow(Scenario):
    """Benchmark scenarios around individual dataflow patterns/operators"""

    pass

Ancestors

Subclasses

Inherited members

class DeltaJoin (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark scenarios around individual dataflow patterns/operators

Expand source code Browse git
class DeltaJoin(Dataflow):
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1;
  /* A */
1

# Delta joins require 3+ tables
> SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1
  /* B */
{self.n()}
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1;
  /* A */
1

# Delta joins require 3+ tables
> SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1
  /* B */
{self.n()}
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
"""
            ),
        ]

Inherited members

class DeltaJoinMaintained (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark DeltaJoin as an indexed view with table-based data initialization, where the empty frontier is not emitted, in contrast with one-shot SELECT processing based on data initialized as a constant view

Expand source code Browse git
class DeltaJoinMaintained(Dataflow):
    """Benchmark DeltaJoin as an indexed view with table-based data initialization, where the
    empty frontier is not emitted, in contrast with one-shot SELECT processing based on data
    initialized as a constant view"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP VIEW IF EXISTS v2;
  /* A */

# Delta joins require 3+ tables
> CREATE VIEW v2 AS SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1

> CREATE DEFAULT INDEX ON v2

> SELECT * FROM v2
  /* B */
{self.n()}
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP VIEW IF EXISTS v2;
  /* A */

# Delta joins require 3+ tables
> CREATE VIEW v2 AS SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1

> CREATE DEFAULT INDEX ON v2

> SELECT * FROM v2
  /* B */
{self.n()}
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
"""
            ),
        ]

Inherited members

class DifferentialJoin (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark scenarios around individual dataflow patterns/operators

Expand source code Browse git
class DifferentialJoin(Dataflow):
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1;
  /* A */
1


> SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)
  /* B */
{self.n()}
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1;
  /* A */
1


> SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)
  /* B */
{self.n()}
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
"""
            ),
        ]

Inherited members

class ExactlyOnce (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes to emit 1M records to a reuse_topic=true sink. As we have limited means to figure out when the complete output has been emited, we have no option but to re-ingest the data to determine completion.

Expand source code Browse git
class ExactlyOnce(Sink):
    """Measure the time it takes to emit 1M records to a reuse_topic=true sink. As we have limited
    means to figure out when the complete output has been emited, we have no option but to re-ingest
    the data to determine completion.
    """

    def shared(self) -> Action:
        return TdAction(
            self.keyschema()
            + self.schema()
            + f"""
$ kafka-create-topic topic=sink-input partitions=16

$ kafka-ingest format=avro topic=sink-input key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
        )

    def init(self) -> Action:
        return TdAction(
            f"""
>[version<7800]  CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> DROP CLUSTER IF EXISTS source_cluster CASCADE

> CREATE CONNECTION IF NOT EXISTS csr_conn
  FOR CONFLUENT SCHEMA REGISTRY
  URL '${{testdrive.schema-registry-url}}';

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE source1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-input-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT;

> SELECT COUNT(*) FROM source1;
{self.n()}
"""
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP SINK IF EXISTS sink1;
> DROP SOURCE IF EXISTS sink1_check CASCADE;
  /* A */

> DROP CLUSTER IF EXISTS sink_cluster CASCADE
> CREATE CLUSTER sink_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SINK sink1
  IN CLUSTER sink_cluster
  FROM source1
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
  KEY (f1)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE DEBEZIUM

$ kafka-verify-topic sink=materialize.public.sink1 await-value-schema=true await-key-schema=true

# Wait until all the records have been emited from the sink, as observed by the sink1_check source

> CREATE SOURCE sink1_check
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
  KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT;

> CREATE MATERIALIZED VIEW sink1_check_v AS SELECT COUNT(*) FROM sink1_check;

> SELECT * FROM sink1_check_v
  /* B */
"""
            + str(self.n())
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP SINK IF EXISTS sink1;
> DROP SOURCE IF EXISTS sink1_check CASCADE;
  /* A */

> DROP CLUSTER IF EXISTS sink_cluster CASCADE
> CREATE CLUSTER sink_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SINK sink1
  IN CLUSTER sink_cluster
  FROM source1
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
  KEY (f1)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE DEBEZIUM

$ kafka-verify-topic sink=materialize.public.sink1 await-value-schema=true await-key-schema=true

# Wait until all the records have been emited from the sink, as observed by the sink1_check source

> CREATE SOURCE sink1_check
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
  KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT;

> CREATE MATERIALIZED VIEW sink1_check_v AS SELECT COUNT(*) FROM sink1_check;

> SELECT * FROM sink1_check_v
  /* B */
"""
            + str(self.n())
        )
def init(self) ‑> Action
Expand source code Browse git
    def init(self) -> Action:
        return TdAction(
            f"""
>[version<7800]  CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> DROP CLUSTER IF EXISTS source_cluster CASCADE

> CREATE CONNECTION IF NOT EXISTS csr_conn
  FOR CONFLUENT SCHEMA REGISTRY
  URL '${{testdrive.schema-registry-url}}';

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE source1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-input-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT;

> SELECT COUNT(*) FROM source1;
{self.n()}
"""
        )
def shared(self) ‑> Action
Expand source code Browse git
    def shared(self) -> Action:
        return TdAction(
            self.keyschema()
            + self.schema()
            + f"""
$ kafka-create-topic topic=sink-input partitions=16

$ kafka-ingest format=avro topic=sink-input key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
        )

Inherited members

class FastPath (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Feature benchmarks related to the "fast path" in query execution, as described in the 'Internals of One-off Queries' presentation.

Expand source code Browse git
class FastPath(Scenario):
    """Feature benchmarks related to the "fast path" in query execution, as described in the
    'Internals of One-off Queries' presentation.
    """

Ancestors

Subclasses

Inherited members

class FastPathFilterIndex (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes for the fast path to filter our all rows from a materialized view using an index and return

Expand source code Browse git
class FastPathFilterIndex(FastPath):
    """Measure the time it takes for the fast path to filter our all rows from a materialized view using an index and return"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}

> CREATE DEFAULT INDEX ON v1;

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    # Since an individual query of this particular type being benchmarked takes 1ms to execute, the results are susceptible
    # to a lot of random noise. As we can not make the query any slower by using e.g. a large dataset,
    # we run the query 100 times in a row and measure the total execution time.

    def benchmark(self) -> MeasurementSource:
        hundred_selects = "\n".join(
            "> SELECT * FROM v1 WHERE f1 = 1;\n1\n" for i in range(0, 1000)
        )

        return Td(
            f"""
> SET auto_route_introspection_queries TO false

> BEGIN

> SELECT 1;
  /* A */
1

{hundred_selects}

> SELECT 1
  /* B */
1
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        hundred_selects = "\n".join(
            "> SELECT * FROM v1 WHERE f1 = 1;\n1\n" for i in range(0, 1000)
        )

        return Td(
            f"""
> SET auto_route_introspection_queries TO false

> BEGIN

> SELECT 1;
  /* A */
1

{hundred_selects}

> SELECT 1
  /* B */
1
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}

> CREATE DEFAULT INDEX ON v1;

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

Inherited members

class FastPathFilterNoIndex (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes for the fast path to filter our all rows from a materialized view and return

Expand source code Browse git
class FastPathFilterNoIndex(FastPath):
    """Measure the time it takes for the fast path to filter our all rows from a materialized view and return"""

    SCALE = 7
    FIXED_SCALE = True  # OOM with 10**8 = 100M records

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()});

> CREATE DEFAULT INDEX ON v1;

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> /* A */ SELECT 1;
1
> /* B */ SELECT * FROM v1 WHERE f2 < 0;
"""
        )

Ancestors

Class variables

var FIXED_SCALE : bool
var SCALE : float

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> /* A */ SELECT 1;
1
> /* B */ SELECT * FROM v1 WHERE f2 < 0;
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()});

> CREATE DEFAULT INDEX ON v1;

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

Inherited members

class FastPathLimit (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark the case SELECT * FROM source LIMIT , optimized by #21615

Expand source code Browse git
class FastPathLimit(FastPath):
    """Benchmark the case SELECT * FROM source LIMIT <i> , optimized by #21615"""

    def init(self) -> list[Action]:
        return [
            TdAction(
                f"""
                > CREATE MATERIALIZED VIEW v1 AS SELECT * FROM generate_series(1, {self.n()})
                """
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            dedent(
                """
                > SELECT 1;
                  /* A */
                1
                > SELECT * FROM v1 LIMIT 100
                  /* B */
                """
            )
            + "\n".join([str(x) for x in range(1, 101)])
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
def benchmark(self) -> MeasurementSource:
    return Td(
        dedent(
            """
            > SELECT 1;
              /* A */
            1
            > SELECT * FROM v1 LIMIT 100
              /* B */
            """
        )
        + "\n".join([str(x) for x in range(1, 101)])
    )
def init(self) ‑> list[Action]
Expand source code Browse git
def init(self) -> list[Action]:
    return [
        TdAction(
            f"""
            > CREATE MATERIALIZED VIEW v1 AS SELECT * FROM generate_series(1, {self.n()})
            """
        ),
    ]

Inherited members

class FastPathOrderByLimit (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark the case SELECT * FROM materialized_view ORDER BY LIMIT

Expand source code Browse git
class FastPathOrderByLimit(FastPath):
    """Benchmark the case SELECT * FROM materialized_view ORDER BY <key> LIMIT <i>"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};

> CREATE DEFAULT INDEX ON v1;

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> SELECT 1;
  /* A */
1
> SELECT f1 FROM v1 ORDER BY f1 DESC LIMIT 1000
  /* B */
"""
            + "\n".join([str(x) for x in range(self.n() - 1000, self.n())])
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> SELECT 1;
  /* A */
1
> SELECT f1 FROM v1 ORDER BY f1 DESC LIMIT 1000
  /* B */
"""
            + "\n".join([str(x) for x in range(self.n() - 1000, self.n())])
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};

> CREATE DEFAULT INDEX ON v1;

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

Inherited members

class Finish (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmarks around te Finish stage of query processing

Expand source code Browse git
class Finish(Scenario):
    """Benchmarks around te Finish stage of query processing"""

Ancestors

Subclasses

Inherited members

class FinishOrderByLimit (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark ORDER BY + LIMIT without the benefit of an index

Expand source code Browse git
class FinishOrderByLimit(Finish):
    """Benchmark ORDER BY + LIMIT without the benefit of an index"""

    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> SELECT f2 FROM v1 ORDER BY 1 DESC LIMIT 1
  /* B */
{self.n()-1}
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> SELECT f2 FROM v1 ORDER BY 1 DESC LIMIT 1
  /* B */
{self.n()-1}
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

Inherited members

class FullOuterJoin (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark scenarios around individual dataflow patterns/operators

Expand source code Browse git
class FullOuterJoin(Dataflow):
    def benchmark(self) -> BenchmarkingSequence:
        columns_select = ", ".join(
            [f"a{i+1}.f1 AS f{i+1}" for i in range(0, floor(self.scale()))]
        )
        columns_using = ", ".join([f"f{i+1}" for i in range(0, floor(self.scale()))])
        inserts = "\n".join([f"> INSERT INTO ten VALUES ({i+1})" for i in range(0, 10)])

        return [
            Td(
                f"""
> DROP MATERIALIZED VIEW IF EXISTS v2 CASCADE;

> DROP MATERIALIZED VIEW IF EXISTS v1 CASCADE;

> DROP TABLE IF EXISTS ten;

> CREATE TABLE ten (f1 INTEGER);

> CREATE MATERIALIZED VIEW v1 AS SELECT {columns_select} FROM {self.join()}
> SELECT 1;
  /* A */
1

> CREATE MATERIALIZED VIEW v2 AS
  SELECT COUNT(a1.f1) AS c1, COUNT(a2.f1) AS c2
  FROM v1 AS a1
  FULL OUTER JOIN v1 AS a2 USING ({columns_using});

{inserts}

> SELECT * FROM v2;
  /* B */
{self.n()} {self.n()}
"""
            )
        ]

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource | list[Action | MeasurementSource]
Expand source code Browse git
    def benchmark(self) -> BenchmarkingSequence:
        columns_select = ", ".join(
            [f"a{i+1}.f1 AS f{i+1}" for i in range(0, floor(self.scale()))]
        )
        columns_using = ", ".join([f"f{i+1}" for i in range(0, floor(self.scale()))])
        inserts = "\n".join([f"> INSERT INTO ten VALUES ({i+1})" for i in range(0, 10)])

        return [
            Td(
                f"""
> DROP MATERIALIZED VIEW IF EXISTS v2 CASCADE;

> DROP MATERIALIZED VIEW IF EXISTS v1 CASCADE;

> DROP TABLE IF EXISTS ten;

> CREATE TABLE ten (f1 INTEGER);

> CREATE MATERIALIZED VIEW v1 AS SELECT {columns_select} FROM {self.join()}
> SELECT 1;
  /* A */
1

> CREATE MATERIALIZED VIEW v2 AS
  SELECT COUNT(a1.f1) AS c1, COUNT(a2.f1) AS c2
  FROM v1 AS a1
  FULL OUTER JOIN v1 AS a2 USING ({columns_using});

{inserts}

> SELECT * FROM v2;
  /* B */
{self.n()} {self.n()}
"""
            )
        ]

Inherited members

class GroupBy (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark scenarios around individual dataflow patterns/operators

Expand source code Browse git
class GroupBy(Dataflow):
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}

> SELECT COUNT(*) = {self.n()} FROM v1
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2)
  /* B */
{self.n()} 0 {self.n()-1}
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2)
  /* B */
{self.n()} 0 {self.n()-1}
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}

> SELECT COUNT(*) = {self.n()} FROM v1
true
"""
            ),
        ]

Inherited members

class GroupByMaintained (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark GroupBy as an indexed view, which renders a dataflow for incremental maintenance, in contrast with one-shot SELECT processing

Expand source code Browse git
class GroupByMaintained(Dataflow):
    """Benchmark GroupBy as an indexed view, which renders a dataflow for incremental
    maintenance, in contrast with one-shot SELECT processing"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}

> SELECT COUNT(*) = {self.n()} FROM v1
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP VIEW IF EXISTS v2;
  /* A */

> CREATE VIEW v2 AS SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2)

> CREATE DEFAULT INDEX ON v2

> SELECT * FROM v2
  /* B */
{self.n()} 0 {self.n()-1}
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP VIEW IF EXISTS v2;
  /* A */

> CREATE VIEW v2 AS SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2)

> CREATE DEFAULT INDEX ON v2

> SELECT * FROM v2
  /* B */
{self.n()} 0 {self.n()-1}
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}

> SELECT COUNT(*) = {self.n()} FROM v1
true
"""
            ),
        ]

Inherited members

class HydrateIndex (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes for an index to hydrate when a cluster comes online.

Expand source code Browse git
class HydrateIndex(Scenario):
    """Measure the time it takes for an index to hydrate when a cluster comes online."""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                """
> CREATE CLUSTER idx_cluster SIZE '16', REPLICATION FACTOR 1
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        sql = f"""
> DROP TABLE IF EXISTS t1 CASCADE
> CREATE TABLE t1 (f1 INTEGER, f2 INTEGER)
> ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 0)
> CREATE INDEX i1 IN CLUSTER idx_cluster ON t1(f1)
> INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}
> UPDATE t1 SET f1 = f1 + 100000
> UPDATE t1 SET f1 = f1 + 1000000
> UPDATE t1 SET f1 = f1 + 10000000
> UPDATE t1 SET f1 = f1 + 100000000
> UPDATE t1 SET f1 = f1 + 1000000000
> SELECT 1
  /* A */
1
> ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 1)
> SET CLUSTER = idx_cluster
? EXPLAIN SELECT COUNT(*) FROM t1
Explained Query:
  Return // {{ arity: 1 }}
    Union // {{ arity: 1 }}
      Get l0 // {{ arity: 1 }}
      Map (0) // {{ arity: 1 }}
        Union // {{ arity: 0 }}
          Negate // {{ arity: 0 }}
            Project () // {{ arity: 0 }}
              Get l0 // {{ arity: 1 }}
          Constant // {{ arity: 0 }}
            - ()
  With
    cte l0 =
      Reduce aggregates=[count(*)] // {{ arity: 1 }}
        Project () // {{ arity: 0 }}
          ReadIndex on=t1 i1=[*** full scan ***] // {{ arity: 2 }}

Used Indexes:
  - materialize.public.i1 (*** full scan ***)

Target cluster: idx_cluster

> SELECT COUNT(*) FROM t1
  /* B */
{self._n}
> SET CLUSTER = default
"""

        if self._mz_version < MzVersion.parse_mz("v0.83.0-dev"):
            sql = remove_arity_information_from_explain(sql)

        if self._mz_version < MzVersion.parse_mz("v0.96.0-dev"):
            sql = remove_target_cluster_from_explain(sql)

        return Td(sql)

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        sql = f"""
> DROP TABLE IF EXISTS t1 CASCADE
> CREATE TABLE t1 (f1 INTEGER, f2 INTEGER)
> ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 0)
> CREATE INDEX i1 IN CLUSTER idx_cluster ON t1(f1)
> INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}
> UPDATE t1 SET f1 = f1 + 100000
> UPDATE t1 SET f1 = f1 + 1000000
> UPDATE t1 SET f1 = f1 + 10000000
> UPDATE t1 SET f1 = f1 + 100000000
> UPDATE t1 SET f1 = f1 + 1000000000
> SELECT 1
  /* A */
1
> ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 1)
> SET CLUSTER = idx_cluster
? EXPLAIN SELECT COUNT(*) FROM t1
Explained Query:
  Return // {{ arity: 1 }}
    Union // {{ arity: 1 }}
      Get l0 // {{ arity: 1 }}
      Map (0) // {{ arity: 1 }}
        Union // {{ arity: 0 }}
          Negate // {{ arity: 0 }}
            Project () // {{ arity: 0 }}
              Get l0 // {{ arity: 1 }}
          Constant // {{ arity: 0 }}
            - ()
  With
    cte l0 =
      Reduce aggregates=[count(*)] // {{ arity: 1 }}
        Project () // {{ arity: 0 }}
          ReadIndex on=t1 i1=[*** full scan ***] // {{ arity: 2 }}

Used Indexes:
  - materialize.public.i1 (*** full scan ***)

Target cluster: idx_cluster

> SELECT COUNT(*) FROM t1
  /* B */
{self._n}
> SET CLUSTER = default
"""

        if self._mz_version < MzVersion.parse_mz("v0.83.0-dev"):
            sql = remove_arity_information_from_explain(sql)

        if self._mz_version < MzVersion.parse_mz("v0.96.0-dev"):
            sql = remove_target_cluster_from_explain(sql)

        return Td(sql)
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                """
> CREATE CLUSTER idx_cluster SIZE '16', REPLICATION FACTOR 1
"""
            ),
        ]

Inherited members

class Insert (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes for an INSERT statement to return.

Expand source code Browse git
class Insert(DML):
    """Measure the time it takes for an INSERT statement to return."""

    def init(self) -> Action:
        return self.table_ten()

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

> INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}
  /* B */
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

> INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}
  /* B */
"""
        )
def init(self) ‑> Action
Expand source code Browse git
def init(self) -> Action:
    return self.table_ten()

Inherited members

class InsertAndSelect (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes for an INSERT statement to return AND for a follow-up SELECT to return data, that is, for the dataflow to be completely caught up.

Expand source code Browse git
class InsertAndSelect(DML):
    """Measure the time it takes for an INSERT statement to return
    AND for a follow-up SELECT to return data, that is, for the
    dataflow to be completely caught up.
    """

    def init(self) -> Action:
        return self.table_ten()

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

> INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()};

> SELECT 1 FROM t1 WHERE f1 = 1
  /* B */
1
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

> INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()};

> SELECT 1 FROM t1 WHERE f1 = 1
  /* B */
1
"""
        )
def init(self) ‑> Action
Expand source code Browse git
def init(self) -> Action:
    return self.table_ten()

Inherited members

class InsertBatch (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes for a batch of INSERT statements to return.

Expand source code Browse git
class InsertBatch(DML):
    """Measure the time it takes for a batch of INSERT statements to return."""

    SCALE = 4

    def benchmark(self) -> MeasurementSource:
        inserts = "\n".join(
            f"> INSERT INTO t1 VALUES ({i});" for i in range(0, self.n())
        )

        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

> SET auto_route_introspection_queries TO false

> BEGIN

{inserts}

> COMMIT
  /* B */
"""
        )

Ancestors

Class variables

var SCALE : float

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        inserts = "\n".join(
            f"> INSERT INTO t1 VALUES ({i});" for i in range(0, self.n())
        )

        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

> SET auto_route_introspection_queries TO false

> BEGIN

{inserts}

> COMMIT
  /* B */
"""
        )

Inherited members

class InsertMultiRow (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes for a single multi-row INSERT statement to return.

Expand source code Browse git
class InsertMultiRow(DML):
    """Measure the time it takes for a single multi-row INSERT statement to return."""

    SCALE = 5

    def benchmark(self) -> MeasurementSource:
        values = ", ".join(f"({i})" for i in range(0, self.n()))

        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

> INSERT INTO t1 VALUES {values}
  /* B */
"""
        )

Ancestors

Class variables

var SCALE : float

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        values = ", ".join(f"({i})" for i in range(0, self.n()))

        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

> INSERT INTO t1 VALUES {values}
  /* B */
"""
        )

Inherited members

class Kafka (scale: float, mz_version: MzVersion, default_size: int, seed: int)
Expand source code Browse git
class Kafka(Scenario):
    pass

Ancestors

Subclasses

Inherited members

class KafkaEnvelopeNoneBytes (scale: float, mz_version: MzVersion, default_size: int, seed: int)
Expand source code Browse git
class KafkaEnvelopeNoneBytes(Kafka):
    def shared(self) -> Action:
        data = "a" * 512
        return TdAction(
            f"""
$ kafka-create-topic topic=kafka-envelope-none-bytes

$ kafka-ingest format=bytes topic=kafka-envelope-none-bytes repeat={self.n()}
{data}
"""
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-envelope-none-bytes-${{testdrive.seed}}')
  FORMAT BYTES
  ENVELOPE NONE
  /* A */

> SELECT COUNT(*) = {self.n()} FROM s1
  /* B */
true
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-envelope-none-bytes-${{testdrive.seed}}')
  FORMAT BYTES
  ENVELOPE NONE
  /* A */

> SELECT COUNT(*) = {self.n()} FROM s1
  /* B */
true
"""
        )
def shared(self) ‑> Action
Expand source code Browse git
    def shared(self) -> Action:
        data = "a" * 512
        return TdAction(
            f"""
$ kafka-create-topic topic=kafka-envelope-none-bytes

$ kafka-ingest format=bytes topic=kafka-envelope-none-bytes repeat={self.n()}
{data}
"""
        )

Inherited members

class KafkaEnvelopeNoneBytesScalability (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Run the same scenario across different scales. Do not materialize the entire source but rather just a non-memory-consuming view on top of it.

Expand source code Browse git
@parameterized_class(
    [{"SCALE": i} for i in [5, 6, 7, 8, 9]], class_name_func=Scenario.name_with_scale
)
class KafkaEnvelopeNoneBytesScalability(ScenarioBig):
    """Run the same scenario across different scales. Do not materialize the entire
    source but rather just a non-memory-consuming view on top of it.
    """

    def shared(self) -> list[Action]:
        return [
            TdAction(
                """
$ kafka-create-topic topic=kafka-scalability partitions=8
"""
            ),
            Kgen(
                topic="kafka-scalability",
                args=[
                    "--keys=sequential",
                    f"--num-records={self.n()}",
                    "--values=bytes",
                    "--max-message-size=100",
                    "--min-message-size=100",
                ],
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-scalability-${{testdrive.seed}}')
  KEY FORMAT BYTES
  VALUE FORMAT BYTES
  ENVELOPE NONE
  /* A */

> CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) AS c FROM s1;

> SELECT c = {self.n()} FROM v1
  /* B */
true
"""
        )

Ancestors

Subclasses

  • materialize.feature_benchmark.scenarios.benchmark_main.KafkaEnvelopeNoneBytesScalability_scale_5
  • materialize.feature_benchmark.scenarios.benchmark_main.KafkaEnvelopeNoneBytesScalability_scale_6
  • materialize.feature_benchmark.scenarios.benchmark_main.KafkaEnvelopeNoneBytesScalability_scale_7
  • materialize.feature_benchmark.scenarios.benchmark_main.KafkaEnvelopeNoneBytesScalability_scale_8
  • materialize.feature_benchmark.scenarios.benchmark_main.KafkaEnvelopeNoneBytesScalability_scale_9

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-scalability-${{testdrive.seed}}')
  KEY FORMAT BYTES
  VALUE FORMAT BYTES
  ENVELOPE NONE
  /* A */

> CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) AS c FROM s1;

> SELECT c = {self.n()} FROM v1
  /* B */
true
"""
        )
def shared(self) ‑> list[Action]
Expand source code Browse git
    def shared(self) -> list[Action]:
        return [
            TdAction(
                """
$ kafka-create-topic topic=kafka-scalability partitions=8
"""
            ),
            Kgen(
                topic="kafka-scalability",
                args=[
                    "--keys=sequential",
                    f"--num-records={self.n()}",
                    "--values=bytes",
                    "--max-message-size=100",
                    "--min-message-size=100",
                ],
            ),
        ]

Inherited members

class KafkaRestart (scale: float, mz_version: MzVersion, default_size: int, seed: int)

This scenario dates from the pre-persistence era where the entire topic was re-ingested from scratch. With presistence however, no reingestion takes place and the scenario exhibits extreme variability. Instead of re-ingestion, we are measuring mostly the speed of COUNT(*), further obscured by the one second timestamp granularity

Expand source code Browse git
class KafkaRestart(ScenarioDisabled):
    """This scenario dates from the pre-persistence era where the entire topic was re-ingested from scratch.
    With presistence however, no reingestion takes place and the scenario exhibits extreme variability.
    Instead of re-ingestion, we are measuring mostly the speed of COUNT(*), further obscured by
    the one second timestamp granularity
    """

    def shared(self) -> Action:
        return TdAction(
            self.keyschema()
            + self.schema()
            + f"""
$ kafka-create-topic topic=kafka-recovery partitions=8

$ kafka-ingest format=avro topic=kafka-recovery key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
        )

    def init(self) -> Action:
        return TdAction(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CONNECTION IF EXISTS s1_csr_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  ENVELOPE UPSERT;

# Make sure we are fully caught up before continuing
> SELECT COUNT(*) FROM s1;
{self.n()}

# Give time for any background tasks (e.g. compaction) to settle down
> SELECT mz_unsafe.mz_sleep(10)
<null>
"""
        )

    def benchmark(self) -> BenchmarkingSequence:
        return [
            Lambda(lambda e: e.RestartMz()),
            Td(
                f"""
> SELECT COUNT(*) /* {self.n()} */ FROM s1;
  /* B */
{self.n()}
"""
            ),
        ]

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource | list[Action | MeasurementSource]
Expand source code Browse git
    def benchmark(self) -> BenchmarkingSequence:
        return [
            Lambda(lambda e: e.RestartMz()),
            Td(
                f"""
> SELECT COUNT(*) /* {self.n()} */ FROM s1;
  /* B */
{self.n()}
"""
            ),
        ]
def init(self) ‑> Action
Expand source code Browse git
    def init(self) -> Action:
        return TdAction(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CONNECTION IF EXISTS s1_csr_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  ENVELOPE UPSERT;

# Make sure we are fully caught up before continuing
> SELECT COUNT(*) FROM s1;
{self.n()}

# Give time for any background tasks (e.g. compaction) to settle down
> SELECT mz_unsafe.mz_sleep(10)
<null>
"""
        )
def shared(self) ‑> Action
Expand source code Browse git
    def shared(self) -> Action:
        return TdAction(
            self.keyschema()
            + self.schema()
            + f"""
$ kafka-create-topic topic=kafka-recovery partitions=8

$ kafka-ingest format=avro topic=kafka-recovery key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
        )

Inherited members

class KafkaRestartBig (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Ingest 100M records without constructing a dataflow that would keep all of them in memory. For the purpose, we emit a bunch of "EOF" records after the primary ingestion is complete and consider that the source has caught up when all the EOF records have been seen.

Expand source code Browse git
class KafkaRestartBig(ScenarioBig):
    """Ingest 100M records without constructing
    a dataflow that would keep all of them in memory. For the purpose, we
    emit a bunch of "EOF" records after the primary ingestion is complete
    and consider that the source has caught up when all the EOF records have
    been seen.
    """

    SCALE = 8

    def shared(self) -> list[Action]:
        return [
            TdAction("$ kafka-create-topic topic=kafka-recovery-big partitions=8"),
            # Ingest 10 ** SCALE records
            Kgen(
                topic="kafka-recovery-big",
                args=[
                    "--keys=random",
                    f"--num-records={self.n()}",
                    "--values=bytes",
                    "--max-message-size=32",
                    "--min-message-size=32",
                    "--key-min=256",
                    f"--key-max={256+(self.n()**2)}",
                ],
            ),
            # Add 256 EOF markers with key values <= 256.
            # This high number is chosen as to guarantee that there will be an EOF marker
            # in each partition, even if the number of partitions is increased in the future.
            Kgen(
                topic="kafka-recovery-big",
                args=[
                    "--keys=sequential",
                    "--num-records=256",
                    "--values=bytes",
                    "--min-message-size=32",
                    "--max-message-size=32",
                ],
            ),
        ]

    def init(self) -> Action:
        return TdAction(
            f"""
>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> DROP CLUSTER IF EXISTS source_cluster CASCADE
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-big-${{testdrive.seed}}')
  KEY FORMAT BYTES
  VALUE FORMAT BYTES
  ENVELOPE UPSERT;

# Confirm that all the EOF markers generated above have been processed
> CREATE MATERIALIZED VIEW s1_is_complete AS SELECT COUNT(*) = 256 FROM s1 WHERE key <= '\\x00000000000000ff'

> SELECT * FROM s1_is_complete;
true
"""
        )

    def benchmark(self) -> BenchmarkingSequence:
        return [
            Lambda(lambda e: e.RestartMz()),
            Td(
                """
> SELECT * FROM s1_is_complete
  /* B */
true
"""
            ),
        ]

Ancestors

Class variables

var SCALE : float

Methods

def benchmark(self) ‑> MeasurementSource | list[Action | MeasurementSource]
Expand source code Browse git
    def benchmark(self) -> BenchmarkingSequence:
        return [
            Lambda(lambda e: e.RestartMz()),
            Td(
                """
> SELECT * FROM s1_is_complete
  /* B */
true
"""
            ),
        ]
def init(self) ‑> Action
Expand source code Browse git
    def init(self) -> Action:
        return TdAction(
            f"""
>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> DROP CLUSTER IF EXISTS source_cluster CASCADE
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-big-${{testdrive.seed}}')
  KEY FORMAT BYTES
  VALUE FORMAT BYTES
  ENVELOPE UPSERT;

# Confirm that all the EOF markers generated above have been processed
> CREATE MATERIALIZED VIEW s1_is_complete AS SELECT COUNT(*) = 256 FROM s1 WHERE key <= '\\x00000000000000ff'

> SELECT * FROM s1_is_complete;
true
"""
        )
def shared(self) ‑> list[Action]
Expand source code Browse git
def shared(self) -> list[Action]:
    return [
        TdAction("$ kafka-create-topic topic=kafka-recovery-big partitions=8"),
        # Ingest 10 ** SCALE records
        Kgen(
            topic="kafka-recovery-big",
            args=[
                "--keys=random",
                f"--num-records={self.n()}",
                "--values=bytes",
                "--max-message-size=32",
                "--min-message-size=32",
                "--key-min=256",
                f"--key-max={256+(self.n()**2)}",
            ],
        ),
        # Add 256 EOF markers with key values <= 256.
        # This high number is chosen as to guarantee that there will be an EOF marker
        # in each partition, even if the number of partitions is increased in the future.
        Kgen(
            topic="kafka-recovery-big",
            args=[
                "--keys=sequential",
                "--num-records=256",
                "--values=bytes",
                "--min-message-size=32",
                "--max-message-size=32",
            ],
        ),
    ]

Inherited members

class KafkaUpsert (scale: float, mz_version: MzVersion, default_size: int, seed: int)
Expand source code Browse git
class KafkaUpsert(Kafka):
    def shared(self) -> Action:
        return TdAction(
            self.keyschema()
            + self.schema()
            + f"""
$ kafka-create-topic topic=kafka-upsert

$ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": 1}} {{"f2": ${{kafka-ingest.iteration}} }}

$ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}}
{{"f1": 2}} {{"f2": 2}}
"""
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
    URL '${{testdrive.schema-registry-url}}'
  );

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-upsert-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT
  /* A */

> SELECT f1 FROM s1
  /* B */
1
2
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
    URL '${{testdrive.schema-registry-url}}'
  );

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-upsert-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
  ENVELOPE UPSERT
  /* A */

> SELECT f1 FROM s1
  /* B */
1
2
"""
        )
def shared(self) ‑> Action
Expand source code Browse git
    def shared(self) -> Action:
        return TdAction(
            self.keyschema()
            + self.schema()
            + f"""
$ kafka-create-topic topic=kafka-upsert

$ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": 1}} {{"f2": ${{kafka-ingest.iteration}} }}

$ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}}
{{"f1": 2}} {{"f2": 2}}
"""
        )

Inherited members

class KafkaUpsertUnique (scale: float, mz_version: MzVersion, default_size: int, seed: int)
Expand source code Browse git
class KafkaUpsertUnique(Kafka):
    def shared(self) -> Action:
        return TdAction(
            self.keyschema()
            + self.schema()
            + f"""
$ kafka-create-topic topic=upsert-unique partitions=16

$ kafka-ingest format=avro topic=upsert-unique key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CONNECTION IF EXISTS s1_csr_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
  /* A */

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-upsert-unique-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  ENVELOPE UPSERT

> SELECT COUNT(*) FROM s1;
  /* B */
{self.n()}
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CONNECTION IF EXISTS s1_csr_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE

>[version<7800]  CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
  /* A */

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-upsert-unique-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  ENVELOPE UPSERT

> SELECT COUNT(*) FROM s1;
  /* B */
{self.n()}
"""
        )
def shared(self) ‑> Action
Expand source code Browse git
    def shared(self) -> Action:
        return TdAction(
            self.keyschema()
            + self.schema()
            + f"""
$ kafka-create-topic topic=upsert-unique partitions=16

$ kafka-ingest format=avro topic=upsert-unique key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
        )

Inherited members

class MFPPushdown (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Test MFP pushdown – WHERE clause with a suitable condition and no index defined.

Expand source code Browse git
class MFPPushdown(Scenario):
    """Test MFP pushdown -- WHERE clause with a suitable condition and no index defined."""

    SCALE = 7
    FIXED_SCALE = True  # OOM with 10**8 = 100M records

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()});

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> /* A */ SELECT 1;
1
> /* B */ SELECT * FROM v1 WHERE f2 < 0;
"""
        )

Ancestors

Class variables

var FIXED_SCALE : bool
var SCALE : float

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> /* A */ SELECT 1;
1
> /* B */ SELECT * FROM v1 WHERE f2 < 0;
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()});

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

Inherited members

class ManySmallInserts (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes for several small INSERT statements to return.

Expand source code Browse git
class ManySmallInserts(DML):
    """Measure the time it takes for several small INSERT statements to return."""

    def init(self) -> Action:
        return self.table_ten()

    def benchmark(self) -> MeasurementSource:
        random.seed(self.seed())

        statements = []
        for _ in range(0, 10000):
            statements.append(f"> INSERT INTO t1 VALUES ({random.randint(0, 100000)})")

        insert_statements_str = "\n".join(statements)

        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

{insert_statements_str}
  /* B */
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        random.seed(self.seed())

        statements = []
        for _ in range(0, 10000):
            statements.append(f"> INSERT INTO t1 VALUES ({random.randint(0, 100000)})")

        insert_statements_str = "\n".join(statements)

        return Td(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 INTEGER)
  /* A */

{insert_statements_str}
  /* B */
"""
        )
def init(self) ‑> Action
Expand source code Browse git
def init(self) -> Action:
    return self.table_ten()

Inherited members

class ManySmallUpdates (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes for several small UPDATE statements to return to client

Expand source code Browse git
class ManySmallUpdates(DML):
    """Measure the time it takes for several small UPDATE statements to return to client"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                """
> CREATE TABLE t1 (f1 INT, f2 INT);

> CREATE DEFAULT INDEX ON t1;

> INSERT INTO t1 SELECT generate_series(1, 10);
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        random.seed(self.seed())

        statements = []
        for _ in range(0, 10000):
            statements.append(
                f"> UPDATE t1 SET f1 = {random.randint(0, 100000)}, f2 = {random.randint(0, 100000)} WHERE f1 % 10 = {random.randint(0, 10)}"
            )

        update_statements_str = "\n".join(statements)

        return Td(
            f"""
> SELECT 1
  /* A */
1

{update_statements_str}
  /* B */
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        random.seed(self.seed())

        statements = []
        for _ in range(0, 10000):
            statements.append(
                f"> UPDATE t1 SET f1 = {random.randint(0, 100000)}, f2 = {random.randint(0, 100000)} WHERE f1 % 10 = {random.randint(0, 10)}"
            )

        update_statements_str = "\n".join(statements)

        return Td(
            f"""
> SELECT 1
  /* A */
1

{update_statements_str}
  /* B */
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                """
> CREATE TABLE t1 (f1 INT, f2 INT);

> CREATE DEFAULT INDEX ON t1;

> INSERT INTO t1 SELECT generate_series(1, 10);
"""
            ),
        ]

Inherited members

class MinMax (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark scenarios around individual dataflow patterns/operators

Expand source code Browse git
class MinMax(Dataflow):
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> SELECT MIN(f1), MAX(f1) AS f1 FROM v1
  /* B */
0 {self.n()-1}
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> SELECT MIN(f1), MAX(f1) AS f1 FROM v1
  /* B */
0 {self.n()-1}
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.view_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

Inherited members

class MinMaxMaintained (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark MinMax as an indexed view, which renders a dataflow for incremental maintenance, in contrast with one-shot SELECT processing

Expand source code Browse git
class MinMaxMaintained(Dataflow):
    """Benchmark MinMax as an indexed view, which renders a dataflow for incremental
    maintenance, in contrast with one-shot SELECT processing"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP VIEW IF EXISTS v2
  /* A */

> CREATE VIEW v2 AS SELECT MIN(f1), MAX(f1) AS f1 FROM v1

> CREATE DEFAULT INDEX ON v2

> SELECT * FROM v2
  /* B */
0 {self.n()-1}
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> DROP VIEW IF EXISTS v2
  /* A */

> CREATE VIEW v2 AS SELECT MIN(f1), MAX(f1) AS f1 FROM v1

> CREATE DEFAULT INDEX ON v2

> SELECT * FROM v2
  /* B */
0 {self.n()-1}
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
            ),
        ]

Inherited members

class MySqlCdc (scale: float, mz_version: MzVersion, default_size: int, seed: int)
Expand source code Browse git
class MySqlCdc(Scenario):
    pass

Ancestors

Subclasses

Inherited members

class MySqlInitialLoad (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes to read 1M existing records from MySQL when creating a materialized source

Expand source code Browse git
class MySqlInitialLoad(MySqlCdc):
    """Measure the time it takes to read 1M existing records from MySQL
    when creating a materialized source"""

    FIXED_SCALE = True  # TODO: Remove when #25323 is fixed

    def shared(self) -> Action:
        return TdAction(
            f"""
$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;

SET @i:=0;
CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
INSERT INTO pk_table SELECT @i:=@i+1, @i*@i FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.n()};
"""
        )

    def before(self) -> Action:
        return TdAction(
            """
> DROP SOURCE IF EXISTS mz_source_mysqlcdc CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE
            """
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
> CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
    HOST mysql,
    USER root,
    PASSWORD SECRET mysqlpass
  )

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE mz_source_mysqlcdc
  IN CLUSTER source_cluster
  FROM MYSQL CONNECTION mysql_conn
  FOR TABLES (public.pk_table)
  /* A */

> SELECT count(*) FROM pk_table
  /* B */
{self.n()}
            """
        )

Ancestors

Class variables

var FIXED_SCALE : bool

Methods

def before(self) ‑> Action
Expand source code Browse git
    def before(self) -> Action:
        return TdAction(
            """
> DROP SOURCE IF EXISTS mz_source_mysqlcdc CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE
            """
        )
def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
> CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
    HOST mysql,
    USER root,
    PASSWORD SECRET mysqlpass
  )

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE mz_source_mysqlcdc
  IN CLUSTER source_cluster
  FROM MYSQL CONNECTION mysql_conn
  FOR TABLES (public.pk_table)
  /* A */

> SELECT count(*) FROM pk_table
  /* B */
{self.n()}
            """
        )
def shared(self) ‑> Action
Expand source code Browse git
    def shared(self) -> Action:
        return TdAction(
            f"""
$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;

SET @i:=0;
CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
INSERT INTO pk_table SELECT @i:=@i+1, @i*@i FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.n()};
"""
        )

Inherited members

class MySqlStreaming (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes to ingest records from MySQL post-snapshot

Expand source code Browse git
class MySqlStreaming(MySqlCdc):
    """Measure the time it takes to ingest records from MySQL post-snapshot"""

    SCALE = 5

    @classmethod
    def can_run(cls, version: MzVersion) -> bool:
        return version >= MzVersion.parse_mz("v0.88.0-dev")

    def shared(self) -> Action:
        return TdAction(
            """
$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
"""
        )

    def before(self) -> Action:
        return TdAction(
            f"""
> DROP SOURCE IF EXISTS s1 CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE;

$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT);

> CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
> CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
    HOST mysql,
    USER root,
    PASSWORD SECRET mysqlpass
  )

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM MYSQL CONNECTION mysql_conn
  FOR TABLES (public.t1)
            """
        )

    def benchmark(self) -> MeasurementSource:
        insertions = "\n".join(
            [
                dedent(
                    f"""
                    SET @i:=0;
                    INSERT INTO t1 (f2) SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {round(self.n()/1000)};
                    COMMIT;
                    """
                )
                for i in range(0, 1000)
            ]
        )

        return Td(
            f"""
> SELECT 1;
  /* A */
1

$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}

$ mysql-execute name=mysql
USE public;
{insertions}

> SELECT count(*) FROM t1
  /* B */
{self.n()}
            """
        )

Ancestors

Class variables

var SCALE : float

Static methods

def can_run(version: MzVersion) ‑> bool
Expand source code Browse git
@classmethod
def can_run(cls, version: MzVersion) -> bool:
    return version >= MzVersion.parse_mz("v0.88.0-dev")

Methods

def before(self) ‑> Action
Expand source code Browse git
    def before(self) -> Action:
        return TdAction(
            f"""
> DROP SOURCE IF EXISTS s1 CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE;

$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT);

> CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
> CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
    HOST mysql,
    USER root,
    PASSWORD SECRET mysqlpass
  )

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM MYSQL CONNECTION mysql_conn
  FOR TABLES (public.t1)
            """
        )
def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        insertions = "\n".join(
            [
                dedent(
                    f"""
                    SET @i:=0;
                    INSERT INTO t1 (f2) SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {round(self.n()/1000)};
                    COMMIT;
                    """
                )
                for i in range(0, 1000)
            ]
        )

        return Td(
            f"""
> SELECT 1;
  /* A */
1

$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}

$ mysql-execute name=mysql
USE public;
{insertions}

> SELECT count(*) FROM t1
  /* B */
{self.n()}
            """
        )
def shared(self) ‑> Action
Expand source code Browse git
    def shared(self) -> Action:
        return TdAction(
            """
$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password}

$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
"""
        )

Inherited members

class OrderBy (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark ORDER BY as executed by the dataflow layer, in contrast with an ORDER BY executed using a Finish step in the coordinator

Expand source code Browse git
class OrderBy(Dataflow):
    """Benchmark ORDER BY as executed by the dataflow layer,
    in contrast with an ORDER BY executed using a Finish step in the coordinator"""

    def init(self) -> Action:
        # Just to spice things up a bit, we perform individual
        # inserts here so that the rows are assigned separate timestamps
        inserts = "\n\n".join(f"> INSERT INTO ten VALUES ({i})" for i in range(0, 10))

        return TdAction(
            f"""
> CREATE TABLE ten (f1 INTEGER);

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};

{inserts}

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
        )

    def benchmark(self) -> MeasurementSource:
        # Explicit LIMIT is needed for the ORDER BY to not be optimized away
        return Td(
            f"""
> DROP MATERIALIZED VIEW IF EXISTS v2
  /* A */

> CREATE MATERIALIZED VIEW v2 AS SELECT * FROM v1 ORDER BY f1 LIMIT 999999999999

> SELECT COUNT(*) FROM v2
  /* B */
{self.n()}
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        # Explicit LIMIT is needed for the ORDER BY to not be optimized away
        return Td(
            f"""
> DROP MATERIALIZED VIEW IF EXISTS v2
  /* A */

> CREATE MATERIALIZED VIEW v2 AS SELECT * FROM v1 ORDER BY f1 LIMIT 999999999999

> SELECT COUNT(*) FROM v2
  /* B */
{self.n()}
"""
        )
def init(self) ‑> Action
Expand source code Browse git
    def init(self) -> Action:
        # Just to spice things up a bit, we perform individual
        # inserts here so that the rows are assigned separate timestamps
        inserts = "\n\n".join(f"> INSERT INTO ten VALUES ({i})" for i in range(0, 10))

        return TdAction(
            f"""
> CREATE TABLE ten (f1 INTEGER);

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};

{inserts}

> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
        )

Inherited members

class PgCdc (scale: float, mz_version: MzVersion, default_size: int, seed: int)
Expand source code Browse git
class PgCdc(Scenario):
    pass

Ancestors

Subclasses

Inherited members

class PgCdcInitialLoad (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes to read 1M existing records from Postgres when creating a materialized source

Expand source code Browse git
class PgCdcInitialLoad(PgCdc):
    """Measure the time it takes to read 1M existing records from Postgres
    when creating a materialized source"""

    def shared(self) -> Action:
        return TdAction(
            f"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER USER postgres WITH replication;
DROP SCHEMA IF EXISTS public CASCADE;
CREATE SCHEMA public;

DROP PUBLICATION IF EXISTS mz_source;
CREATE PUBLICATION mz_source FOR ALL TABLES;

CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
INSERT INTO pk_table SELECT x, x*2 FROM generate_series(1, {self.n()}) as x;
ALTER TABLE pk_table REPLICA IDENTITY FULL;
"""
        )

    def before(self) -> Action:
        return TdAction(
            """
> DROP SOURCE IF EXISTS mz_source_pgcdc CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE
            """
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'

> CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES (
    HOST postgres,
    DATABASE postgres,
    USER postgres,
    PASSWORD SECRET pgpass
  )

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE mz_source_pgcdc
  IN CLUSTER source_cluster
  FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'mz_source')
  FOR TABLES ("pk_table")
  /* A */

> SELECT count(*) FROM pk_table
  /* B */
{self.n()}
            """
        )

Ancestors

Methods

def before(self) ‑> Action
Expand source code Browse git
    def before(self) -> Action:
        return TdAction(
            """
> DROP SOURCE IF EXISTS mz_source_pgcdc CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE
            """
        )
def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'

> CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES (
    HOST postgres,
    DATABASE postgres,
    USER postgres,
    PASSWORD SECRET pgpass
  )

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE mz_source_pgcdc
  IN CLUSTER source_cluster
  FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'mz_source')
  FOR TABLES ("pk_table")
  /* A */

> SELECT count(*) FROM pk_table
  /* B */
{self.n()}
            """
        )
def shared(self) ‑> Action
Expand source code Browse git
    def shared(self) -> Action:
        return TdAction(
            f"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER USER postgres WITH replication;
DROP SCHEMA IF EXISTS public CASCADE;
CREATE SCHEMA public;

DROP PUBLICATION IF EXISTS mz_source;
CREATE PUBLICATION mz_source FOR ALL TABLES;

CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
INSERT INTO pk_table SELECT x, x*2 FROM generate_series(1, {self.n()}) as x;
ALTER TABLE pk_table REPLICA IDENTITY FULL;
"""
        )

Inherited members

class PgCdcStreaming (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes to ingest records from Postgres post-snapshot

Expand source code Browse git
class PgCdcStreaming(PgCdc):
    """Measure the time it takes to ingest records from Postgres post-snapshot"""

    SCALE = 5

    def shared(self) -> Action:
        return TdAction(
            """
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER USER postgres WITH replication;
DROP SCHEMA IF EXISTS public CASCADE;
CREATE SCHEMA public;

DROP PUBLICATION IF EXISTS p1;
CREATE PUBLICATION p1 FOR ALL TABLES;
"""
        )

    def before(self) -> Action:
        return TdAction(
            f"""
> DROP SOURCE IF EXISTS s1 CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE;

$ postgres-execute connection=postgres://postgres:postgres@postgres
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT);
ALTER TABLE t1 REPLICA IDENTITY FULL;

> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'

> CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES (
    HOST postgres,
    DATABASE postgres,
    USER postgres,
    PASSWORD SECRET pgpass
  )

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'p1')
  FOR TABLES ("t1")
            """
        )

    def benchmark(self) -> MeasurementSource:
        insertions = "\n".join(
            [
                f"INSERT INTO t1 (f2) SELECT x FROM generate_series(1, {self.n()/1000}) as x;\nCOMMIT;"
                for i in range(0, 1000)
            ]
        )

        return Td(
            f"""
> SELECT 1;
  /* A */
1

$ postgres-execute connection=postgres://postgres:postgres@postgres
{insertions}

> SELECT count(*) FROM t1
  /* B */
{self.n()}
            """
        )

Ancestors

Class variables

var SCALE : float

Methods

def before(self) ‑> Action
Expand source code Browse git
    def before(self) -> Action:
        return TdAction(
            f"""
> DROP SOURCE IF EXISTS s1 CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE;

$ postgres-execute connection=postgres://postgres:postgres@postgres
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT);
ALTER TABLE t1 REPLICA IDENTITY FULL;

> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'

> CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES (
    HOST postgres,
    DATABASE postgres,
    USER postgres,
    PASSWORD SECRET pgpass
  )

> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE s1
  IN CLUSTER source_cluster
  FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'p1')
  FOR TABLES ("t1")
            """
        )
def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        insertions = "\n".join(
            [
                f"INSERT INTO t1 (f2) SELECT x FROM generate_series(1, {self.n()/1000}) as x;\nCOMMIT;"
                for i in range(0, 1000)
            ]
        )

        return Td(
            f"""
> SELECT 1;
  /* A */
1

$ postgres-execute connection=postgres://postgres:postgres@postgres
{insertions}

> SELECT count(*) FROM t1
  /* B */
{self.n()}
            """
        )
def shared(self) ‑> Action
Expand source code Browse git
    def shared(self) -> Action:
        return TdAction(
            """
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER USER postgres WITH replication;
DROP SCHEMA IF EXISTS public CASCADE;
CREATE SCHEMA public;

DROP PUBLICATION IF EXISTS p1;
CREATE PUBLICATION p1 FOR ALL TABLES;
"""
        )

Inherited members

class QueryLatency (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Feature benchmarks pertaining to the coordinator.

Expand source code Browse git
class QueryLatency(Coordinator):
    SCALE = 3
    """Measure the time it takes to run SELECT 1 queries"""

    def benchmark(self) -> MeasurementSource:
        selects = "\n".join("> SELECT 1\n1\n" for i in range(0, self.n()))

        return Td(
            f"""
> SET auto_route_introspection_queries TO false

> BEGIN

> SELECT 1;
  /* A */
1

{selects}

> SELECT 1;
  /* B */
1
"""
        )

Ancestors

Class variables

var SCALE : float

Measure the time it takes to run SELECT 1 queries

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        selects = "\n".join("> SELECT 1\n1\n" for i in range(0, self.n()))

        return Td(
            f"""
> SET auto_route_introspection_queries TO false

> BEGIN

> SELECT 1;
  /* A */
1

{selects}

> SELECT 1;
  /* B */
1
"""
        )

Inherited members

class Retraction (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Benchmark the time it takes to process a very large retraction

Expand source code Browse git
class Retraction(Dataflow):
    """Benchmark the time it takes to process a very large retraction"""

    def before(self) -> Action:
        return TdAction(
            f"""
> DROP TABLE IF EXISTS ten CASCADE;

> CREATE TABLE ten (f1 INTEGER);

> INSERT INTO ten VALUES (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()}

> SELECT COUNT(*) = {self.n()} AS f1 FROM v1;
true
"""
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> SELECT 1
  /* A */
1

> DELETE FROM ten;

> SELECT COUNT(*) FROM v1
  /* B */
0
"""
        )

Ancestors

Methods

def before(self) ‑> Action
Expand source code Browse git
    def before(self) -> Action:
        return TdAction(
            f"""
> DROP TABLE IF EXISTS ten CASCADE;

> CREATE TABLE ten (f1 INTEGER);

> INSERT INTO ten VALUES (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);

> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()}

> SELECT COUNT(*) = {self.n()} AS f1 FROM v1;
true
"""
        )
def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            """
> SELECT 1
  /* A */
1

> DELETE FROM ten;

> SELECT COUNT(*) FROM v1
  /* B */
0
"""
        )

Inherited members

class Sink (scale: float, mz_version: MzVersion, default_size: int, seed: int)
Expand source code Browse git
class Sink(Scenario):
    pass

Ancestors

Subclasses

Inherited members

class Startup (scale: float, mz_version: MzVersion, default_size: int, seed: int)
Expand source code Browse git
class Startup(Scenario):
    pass

Ancestors

Subclasses

Inherited members

class StartupEmpty (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes to restart an empty Mz instance.

Expand source code Browse git
class StartupEmpty(Startup):
    """Measure the time it takes to restart an empty Mz instance."""

    def benchmark(self) -> BenchmarkingSequence:
        return [
            Lambda(lambda e: e.RestartMz()),
            Td(
                """
> SELECT 1;
  /* B */
1
"""
            ),
        ]

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource | list[Action | MeasurementSource]
Expand source code Browse git
    def benchmark(self) -> BenchmarkingSequence:
        return [
            Lambda(lambda e: e.RestartMz()),
            Td(
                """
> SELECT 1;
  /* B */
1
"""
            ),
        ]

Inherited members

class StartupLoaded (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes to restart a populated Mz instance and have all the dataflows be ready to return something

Expand source code Browse git
class StartupLoaded(Scenario):
    """Measure the time it takes to restart a populated Mz instance and have all the dataflows be ready to return something"""

    SCALE = 1.2  # 25 objects of each kind
    FIXED_SCALE = (
        True  # Can not scale to 100s of objects, so --size=+N will have no effect
    )

    def shared(self) -> Action:
        return TdAction(
            self.schema()
            + """
$ kafka-create-topic topic=startup-time

$ kafka-ingest format=avro topic=startup-time schema=${schema} repeat=1
{"f2": 1}
"""
        )

    def init(self) -> Action:
        create_tables = "\n".join(
            f"> CREATE TABLE t{i} (f1 INTEGER);\n> INSERT INTO t{i} DEFAULT VALUES;"
            for i in range(0, self.n())
        )
        create_sources = "\n".join(
            f"""
> DROP CLUSTER IF EXISTS source{i}_cluster CASCADE;
> CREATE CLUSTER source{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE source{i}
  IN CLUSTER source{i}_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-startup-time-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  ENVELOPE NONE
"""
            for i in range(0, self.n())
        )
        join = " ".join(
            f"LEFT JOIN source{i} USING (f2)" for i in range(1, (ceil(self.scale())))
        )

        create_views = "\n".join(
            f"> CREATE MATERIALIZED VIEW v{i} AS SELECT * FROM source{i} AS s {join} LIMIT {i+1}"
            for i in range(0, self.n())
        )

        create_sinks = "\n".join(
            f"""
> DROP CLUSTER IF EXISTS sink{i}_cluster;
> CREATE CLUSTER sink{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SINK sink{i}
  IN CLUSTER sink{i}_cluster
  FROM source{i}
  INTO KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
  KEY (f2)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  ENVELOPE DEBEZIUM
"""
            for i in range(0, self.n())
        )

        return TdAction(
            f"""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET max_objects_per_schema = {self.n() * 10};
ALTER SYSTEM SET max_materialized_views = {self.n() * 2};
ALTER SYSTEM SET max_sources = {self.n() * 2};
ALTER SYSTEM SET max_sinks = {self.n() * 2};
ALTER SYSTEM SET max_tables = {self.n() * 2};
ALTER SYSTEM SET max_clusters = {self.n() * 6};

> DROP OWNED BY materialize CASCADE;

>[version<7800]  CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  FOR CONFLUENT SCHEMA REGISTRY
  URL '${{testdrive.schema-registry-url}}';

{create_tables}
{create_sources}
{create_views}
{create_sinks}
"""
        )

    def benchmark(self) -> BenchmarkingSequence:
        check_tables = "\n".join(
            f"> SELECT COUNT(*) >= 0 FROM t{i}\ntrue" for i in range(0, self.n())
        )
        check_sources = "\n".join(
            f"> SELECT COUNT(*) > 0 FROM source{i}\ntrue" for i in range(0, self.n())
        )
        check_views = "\n".join(
            f"> SELECT COUNT(*) > 0 FROM v{i}\ntrue" for i in range(0, self.n())
        )

        return [
            Lambda(lambda e: e.RestartMz()),
            Td(
                f"""
{check_views}
{check_sources}
{check_tables}
> SELECT 1;
  /* B */
1
"""
            ),
        ]

Ancestors

Class variables

var FIXED_SCALE : bool
var SCALE : float

Methods

def benchmark(self) ‑> MeasurementSource | list[Action | MeasurementSource]
Expand source code Browse git
    def benchmark(self) -> BenchmarkingSequence:
        check_tables = "\n".join(
            f"> SELECT COUNT(*) >= 0 FROM t{i}\ntrue" for i in range(0, self.n())
        )
        check_sources = "\n".join(
            f"> SELECT COUNT(*) > 0 FROM source{i}\ntrue" for i in range(0, self.n())
        )
        check_views = "\n".join(
            f"> SELECT COUNT(*) > 0 FROM v{i}\ntrue" for i in range(0, self.n())
        )

        return [
            Lambda(lambda e: e.RestartMz()),
            Td(
                f"""
{check_views}
{check_sources}
{check_tables}
> SELECT 1;
  /* B */
1
"""
            ),
        ]
def init(self) ‑> Action
Expand source code Browse git
    def init(self) -> Action:
        create_tables = "\n".join(
            f"> CREATE TABLE t{i} (f1 INTEGER);\n> INSERT INTO t{i} DEFAULT VALUES;"
            for i in range(0, self.n())
        )
        create_sources = "\n".join(
            f"""
> DROP CLUSTER IF EXISTS source{i}_cluster CASCADE;
> CREATE CLUSTER source{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;

> CREATE SOURCE source{i}
  IN CLUSTER source{i}_cluster
  FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-startup-time-${{testdrive.seed}}')
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  ENVELOPE NONE
"""
            for i in range(0, self.n())
        )
        join = " ".join(
            f"LEFT JOIN source{i} USING (f2)" for i in range(1, (ceil(self.scale())))
        )

        create_views = "\n".join(
            f"> CREATE MATERIALIZED VIEW v{i} AS SELECT * FROM source{i} AS s {join} LIMIT {i+1}"
            for i in range(0, self.n())
        )

        create_sinks = "\n".join(
            f"""
> DROP CLUSTER IF EXISTS sink{i}_cluster;
> CREATE CLUSTER sink{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SINK sink{i}
  IN CLUSTER sink{i}_cluster
  FROM source{i}
  INTO KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
  KEY (f2)
  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
  ENVELOPE DEBEZIUM
"""
            for i in range(0, self.n())
        )

        return TdAction(
            f"""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET max_objects_per_schema = {self.n() * 10};
ALTER SYSTEM SET max_materialized_views = {self.n() * 2};
ALTER SYSTEM SET max_sources = {self.n() * 2};
ALTER SYSTEM SET max_sinks = {self.n() * 2};
ALTER SYSTEM SET max_tables = {self.n() * 2};
ALTER SYSTEM SET max_clusters = {self.n() * 6};

> DROP OWNED BY materialize CASCADE;

>[version<7800]  CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);

> CREATE CONNECTION IF NOT EXISTS s1_csr_conn
  FOR CONFLUENT SCHEMA REGISTRY
  URL '${{testdrive.schema-registry-url}}';

{create_tables}
{create_sources}
{create_views}
{create_sinks}
"""
        )
def shared(self) ‑> Action
Expand source code Browse git
    def shared(self) -> Action:
        return TdAction(
            self.schema()
            + """
$ kafka-create-topic topic=startup-time

$ kafka-ingest format=avro topic=startup-time schema=${schema} repeat=1
{"f2": 1}
"""
        )

Inherited members

class SwapSchema (scale: float, mz_version: MzVersion, default_size: int, seed: int)
Expand source code Browse git
class SwapSchema(Scenario):
    SCALE = 2
    FIXED_SCALE = True

    def init(self) -> list[Action]:
        blue_views_on_table = "\n".join(
            f"> CREATE VIEW blue.v{i} AS SELECT * FROM blue.t1;"
            for i in range(0, self.n())
        )

        green_views_on_table = "\n".join(
            f"> CREATE VIEW green.v{i} AS SELECT * FROM green.t1;"
            for i in range(0, self.n())
        )

        noise_views_on_blue_view = "\n".join(
            f"> CREATE VIEW noise.v{i} AS SELECT * FROM blue.v0;"
            for i in range(0, self.n())
        )

        noise_views_on_noise_view = "\n".join(
            f"> CREATE VIEW noise.extra_v{i} AS SELECT * FROM noise.v0;"
            for i in range(0, self.n())
        )

        return [
            TdAction(
                f"""
> CREATE SCHEMA blue;
> CREATE SCHEMA green;
> CREATE SCHEMA noise;

> CREATE TABLE blue.t1 (a int, b text);
> CREATE TABLE green.t1 (a int, b text);

{blue_views_on_table}
{green_views_on_table}
{noise_views_on_blue_view}
{noise_views_on_noise_view}
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            dedent(
                """
                > SELECT 1;
                  /* A */
                1

                > ALTER SCHEMA blue SWAP WITH green;

                > SELECT 1;
                  /* B */
                1
                """
            )
        )

Ancestors

Class variables

var FIXED_SCALE : bool
var SCALE : float

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
def benchmark(self) -> MeasurementSource:
    return Td(
        dedent(
            """
            > SELECT 1;
              /* A */
            1

            > ALTER SCHEMA blue SWAP WITH green;

            > SELECT 1;
              /* B */
            1
            """
        )
    )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        blue_views_on_table = "\n".join(
            f"> CREATE VIEW blue.v{i} AS SELECT * FROM blue.t1;"
            for i in range(0, self.n())
        )

        green_views_on_table = "\n".join(
            f"> CREATE VIEW green.v{i} AS SELECT * FROM green.t1;"
            for i in range(0, self.n())
        )

        noise_views_on_blue_view = "\n".join(
            f"> CREATE VIEW noise.v{i} AS SELECT * FROM blue.v0;"
            for i in range(0, self.n())
        )

        noise_views_on_noise_view = "\n".join(
            f"> CREATE VIEW noise.extra_v{i} AS SELECT * FROM noise.v0;"
            for i in range(0, self.n())
        )

        return [
            TdAction(
                f"""
> CREATE SCHEMA blue;
> CREATE SCHEMA green;
> CREATE SCHEMA noise;

> CREATE TABLE blue.t1 (a int, b text);
> CREATE TABLE green.t1 (a int, b text);

{blue_views_on_table}
{green_views_on_table}
{noise_views_on_blue_view}
{noise_views_on_noise_view}
"""
            ),
        ]

Inherited members

class Update (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes for an UPDATE statement to return to client

Expand source code Browse git
class Update(DML):
    """Measure the time it takes for an UPDATE statement to return to client"""

    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE TABLE t1 (f1 BIGINT);

> CREATE DEFAULT INDEX ON t1;

> INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}
"""
            ),
        ]

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> UPDATE t1 SET f1 = f1 + {self.n()}
  /* B */
"""
        )

Ancestors

Methods

def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> UPDATE t1 SET f1 = f1 + {self.n()}
  /* B */
"""
        )
def init(self) ‑> list[Action]
Expand source code Browse git
    def init(self) -> list[Action]:
        return [
            self.table_ten(),
            TdAction(
                f"""
> CREATE TABLE t1 (f1 BIGINT);

> CREATE DEFAULT INDEX ON t1;

> INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}
"""
            ),
        ]

Inherited members

class UpdateMultiNoIndex (scale: float, mz_version: MzVersion, default_size: int, seed: int)

Measure the time it takes to perform multiple updates over the same records in a non-indexed table. GitHub Issue #11071

Expand source code Browse git
class UpdateMultiNoIndex(DML):
    """Measure the time it takes to perform multiple updates over the same records in a non-indexed table. GitHub Issue #11071"""

    def before(self) -> Action:
        # Due to exterme variability in the results, we have no option but to drop and re-create
        # the table prior to each measurement
        return TdAction(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 BIGINT);

> INSERT INTO t1 SELECT * FROM generate_series(0, {self.n()})
"""
        )

    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> UPDATE t1 SET f1 = f1 + {self.n()}

> SELECT COUNT(*) FROM t1 WHERE f1 > {self.n()}
  /* B */
{self.n()}
"""
        )

Ancestors

Methods

def before(self) ‑> Action
Expand source code Browse git
    def before(self) -> Action:
        # Due to exterme variability in the results, we have no option but to drop and re-create
        # the table prior to each measurement
        return TdAction(
            f"""
> DROP TABLE IF EXISTS t1;

> CREATE TABLE t1 (f1 BIGINT);

> INSERT INTO t1 SELECT * FROM generate_series(0, {self.n()})
"""
        )
def benchmark(self) ‑> MeasurementSource
Expand source code Browse git
    def benchmark(self) -> MeasurementSource:
        return Td(
            f"""
> SELECT 1
  /* A */
1

> UPDATE t1 SET f1 = f1 + {self.n()}

> SELECT COUNT(*) FROM t1 WHERE f1 > {self.n()}
  /* B */
{self.n()}
"""
        )

Inherited members