Module materialize.feature_benchmark.scenarios.benchmark_main
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
import re
from math import ceil, floor
from textwrap import dedent
from parameterized import parameterized_class # type: ignore
from materialize.feature_benchmark.action import Action, Kgen, TdAction
from materialize.feature_benchmark.measurement_source import (
Lambda,
MeasurementSource,
Td,
)
from materialize.feature_benchmark.scenario import (
BenchmarkingSequence,
Scenario,
ScenarioBig,
ScenarioDisabled,
)
from materialize.mz_version import MzVersion
# for pdoc ignores
__pdoc__ = {}
class FastPath(Scenario):
"""Feature benchmarks related to the "fast path" in query execution, as described in the
'Internals of One-off Queries' presentation.
"""
class FastPathFilterNoIndex(FastPath):
"""Measure the time it takes for the fast path to filter our all rows from a materialized view and return"""
SCALE = 7
FIXED_SCALE = True # OOM with 10**8 = 100M records
def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()});
> CREATE DEFAULT INDEX ON v1;
> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
"""
> /* A */ SELECT 1;
1
> /* B */ SELECT * FROM v1 WHERE f2 < 0;
"""
)
class MFPPushdown(Scenario):
"""Test MFP pushdown -- WHERE clause with a suitable condition and no index defined."""
SCALE = 7
FIXED_SCALE = True # OOM with 10**8 = 100M records
def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()});
> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
"""
> /* A */ SELECT 1;
1
> /* B */ SELECT * FROM v1 WHERE f2 < 0;
"""
)
class FastPathFilterIndex(FastPath):
"""Measure the time it takes for the fast path to filter our all rows from a materialized view using an index and return"""
def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
> CREATE DEFAULT INDEX ON v1;
> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
),
]
# Since an individual query of this particular type being benchmarked takes 1ms to execute, the results are susceptible
# to a lot of random noise. As we can not make the query any slower by using e.g. a large dataset,
# we run the query 100 times in a row and measure the total execution time.
def benchmark(self) -> MeasurementSource:
hundred_selects = "\n".join(
"> SELECT * FROM v1 WHERE f1 = 1;\n1\n" for i in range(0, 1000)
)
return Td(
f"""
> SET auto_route_introspection_queries TO false
> BEGIN
> SELECT 1;
/* A */
1
{hundred_selects}
> SELECT 1
/* B */
1
"""
)
class FastPathOrderByLimit(FastPath):
"""Benchmark the case SELECT * FROM materialized_view ORDER BY <key> LIMIT <i>"""
def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};
> CREATE DEFAULT INDEX ON v1;
> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
"""
> SELECT 1;
/* A */
1
> SELECT f1 FROM v1 ORDER BY f1 DESC LIMIT 1000
/* B */
"""
+ "\n".join([str(x) for x in range(self.n() - 1000, self.n())])
)
class FastPathLimit(FastPath):
"""Benchmark the case SELECT * FROM source LIMIT <i> , optimized by #21615"""
def init(self) -> list[Action]:
return [
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT * FROM generate_series(1, {self.n()})
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
dedent(
"""
> SELECT 1;
/* A */
1
> SELECT * FROM v1 LIMIT 100
/* B */
"""
)
+ "\n".join([str(x) for x in range(1, 101)])
)
class DML(Scenario):
"""Benchmarks around the performance of DML statements"""
pass
class Insert(DML):
"""Measure the time it takes for an INSERT statement to return."""
def init(self) -> Action:
return self.table_ten()
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> DROP TABLE IF EXISTS t1;
> CREATE TABLE t1 (f1 INTEGER)
/* A */
> INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}
/* B */
"""
)
class ManySmallInserts(DML):
"""Measure the time it takes for several small INSERT statements to return."""
def init(self) -> Action:
return self.table_ten()
def benchmark(self) -> MeasurementSource:
random.seed(self.seed())
statements = []
for _ in range(0, 10000):
statements.append(f"> INSERT INTO t1 VALUES ({random.randint(0, 100000)})")
insert_statements_str = "\n".join(statements)
return Td(
f"""
> DROP TABLE IF EXISTS t1;
> CREATE TABLE t1 (f1 INTEGER)
/* A */
{insert_statements_str}
/* B */
"""
)
class InsertBatch(DML):
"""Measure the time it takes for a batch of INSERT statements to return."""
SCALE = 4
def benchmark(self) -> MeasurementSource:
inserts = "\n".join(
f"> INSERT INTO t1 VALUES ({i});" for i in range(0, self.n())
)
return Td(
f"""
> DROP TABLE IF EXISTS t1;
> CREATE TABLE t1 (f1 INTEGER)
/* A */
> SET auto_route_introspection_queries TO false
> BEGIN
{inserts}
> COMMIT
/* B */
"""
)
class InsertMultiRow(DML):
"""Measure the time it takes for a single multi-row INSERT statement to return."""
SCALE = 5
def benchmark(self) -> MeasurementSource:
values = ", ".join(f"({i})" for i in range(0, self.n()))
return Td(
f"""
> DROP TABLE IF EXISTS t1;
> CREATE TABLE t1 (f1 INTEGER)
/* A */
> INSERT INTO t1 VALUES {values}
/* B */
"""
)
class Update(DML):
"""Measure the time it takes for an UPDATE statement to return to client"""
def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
f"""
> CREATE TABLE t1 (f1 BIGINT);
> CREATE DEFAULT INDEX ON t1;
> INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> SELECT 1
/* A */
1
> UPDATE t1 SET f1 = f1 + {self.n()}
/* B */
"""
)
class ManySmallUpdates(DML):
"""Measure the time it takes for several small UPDATE statements to return to client"""
def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
"""
> CREATE TABLE t1 (f1 INT, f2 INT);
> CREATE DEFAULT INDEX ON t1;
> INSERT INTO t1 SELECT generate_series(1, 10);
"""
),
]
def benchmark(self) -> MeasurementSource:
random.seed(self.seed())
statements = []
for _ in range(0, 10000):
statements.append(
f"> UPDATE t1 SET f1 = {random.randint(0, 100000)}, f2 = {random.randint(0, 100000)} WHERE f1 % 10 = {random.randint(0, 10)}"
)
update_statements_str = "\n".join(statements)
return Td(
f"""
> SELECT 1
/* A */
1
{update_statements_str}
/* B */
"""
)
class UpdateMultiNoIndex(DML):
"""Measure the time it takes to perform multiple updates over the same records in a non-indexed table. GitHub Issue #11071"""
def before(self) -> Action:
# Due to exterme variability in the results, we have no option but to drop and re-create
# the table prior to each measurement
return TdAction(
f"""
> DROP TABLE IF EXISTS t1;
> CREATE TABLE t1 (f1 BIGINT);
> INSERT INTO t1 SELECT * FROM generate_series(0, {self.n()})
"""
)
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> SELECT 1
/* A */
1
> UPDATE t1 SET f1 = f1 + {self.n()}
> SELECT COUNT(*) FROM t1 WHERE f1 > {self.n()}
/* B */
{self.n()}
"""
)
class InsertAndSelect(DML):
"""Measure the time it takes for an INSERT statement to return
AND for a follow-up SELECT to return data, that is, for the
dataflow to be completely caught up.
"""
def init(self) -> Action:
return self.table_ten()
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> DROP TABLE IF EXISTS t1;
> CREATE TABLE t1 (f1 INTEGER)
/* A */
> INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()};
> SELECT 1 FROM t1 WHERE f1 = 1
/* B */
1
"""
)
class Dataflow(Scenario):
"""Benchmark scenarios around individual dataflow patterns/operators"""
pass
class OrderBy(Dataflow):
"""Benchmark ORDER BY as executed by the dataflow layer,
in contrast with an ORDER BY executed using a Finish step in the coordinator"""
def init(self) -> Action:
# Just to spice things up a bit, we perform individual
# inserts here so that the rows are assigned separate timestamps
inserts = "\n\n".join(f"> INSERT INTO ten VALUES ({i})" for i in range(0, 10))
return TdAction(
f"""
> CREATE TABLE ten (f1 INTEGER);
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};
{inserts}
> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
)
def benchmark(self) -> MeasurementSource:
# Explicit LIMIT is needed for the ORDER BY to not be optimized away
return Td(
f"""
> DROP MATERIALIZED VIEW IF EXISTS v2
/* A */
> CREATE MATERIALIZED VIEW v2 AS SELECT * FROM v1 ORDER BY f1 LIMIT 999999999999
> SELECT COUNT(*) FROM v2
/* B */
{self.n()}
"""
)
class CountDistinct(Dataflow):
def init(self) -> list[Action]:
return [
self.view_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()};
> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> SELECT 1
/* A */
1
> SELECT COUNT(DISTINCT f1) AS f1 FROM v1
/* B */
{self.n()}
"""
)
class MinMax(Dataflow):
def init(self) -> list[Action]:
return [
self.view_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};
> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> SELECT 1
/* A */
1
> SELECT MIN(f1), MAX(f1) AS f1 FROM v1
/* B */
0 {self.n()-1}
"""
)
class MinMaxMaintained(Dataflow):
"""Benchmark MinMax as an indexed view, which renders a dataflow for incremental
maintenance, in contrast with one-shot SELECT processing"""
def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()};
> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> DROP VIEW IF EXISTS v2
/* A */
> CREATE VIEW v2 AS SELECT MIN(f1), MAX(f1) AS f1 FROM v1
> CREATE DEFAULT INDEX ON v2
> SELECT * FROM v2
/* B */
0 {self.n()-1}
"""
)
class GroupBy(Dataflow):
def init(self) -> list[Action]:
return [
self.view_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
> SELECT COUNT(*) = {self.n()} FROM v1
true
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> SELECT 1
/* A */
1
> SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2)
/* B */
{self.n()} 0 {self.n()-1}
"""
)
class GroupByMaintained(Dataflow):
"""Benchmark GroupBy as an indexed view, which renders a dataflow for incremental
maintenance, in contrast with one-shot SELECT processing"""
def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
> SELECT COUNT(*) = {self.n()} FROM v1
true
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> DROP VIEW IF EXISTS v2;
/* A */
> CREATE VIEW v2 AS SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2)
> CREATE DEFAULT INDEX ON v2
> SELECT * FROM v2
/* B */
{self.n()} 0 {self.n()-1}
"""
)
class CrossJoin(Dataflow):
def init(self) -> Action:
return self.view_ten()
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> DROP MATERIALIZED VIEW IF EXISTS v1;
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()}
/* A */
> SELECT COUNT(*) = {self.n()} AS f1 FROM v1;
/* B */
true
"""
)
class AccumulateReductions(Dataflow):
"""Benchmark the accumulation of reductions."""
def before(self) -> Action:
return TdAction(
"""
> DROP TABLE IF EXISTS t CASCADE;
> CREATE TABLE t (a int, b int, c int, d int);
> CREATE MATERIALIZED VIEW data AS
SELECT a, a AS b FROM generate_series(1, 10000000) AS a
UNION ALL
SELECT a, b FROM t;
> INSERT INTO t (a, b) VALUES (1, 1);
> INSERT INTO t (a, b) VALUES (0, 0);
> DROP CLUSTER IF EXISTS idx_cluster CASCADE;
> CREATE CLUSTER idx_cluster SIZE '1-8G', REPLICATION FACTOR 1;
> CREATE VIEW accumulable AS
SELECT
a,
sum(a) AS sum_a, count(a) as cnt_a,
sum(b) AS sum_b, count(b) as cnt_b
FROM data
GROUP BY a;
"""
)
def benchmark(self) -> MeasurementSource:
sql = """
> SELECT 1
/* A */
1
> CREATE INDEX i_accumulable IN CLUSTER idx_cluster ON accumulable(a);
> SET CLUSTER = idx_cluster;
? EXPLAIN SELECT count(*) FROM accumulable;
Explained Query:
Return // { arity: 1 }
Union // { arity: 1 }
Get l0 // { arity: 1 }
Map (0) // { arity: 1 }
Union // { arity: 0 }
Negate // { arity: 0 }
Project () // { arity: 0 }
Get l0 // { arity: 1 }
Constant // { arity: 0 }
- ()
With
cte l0 =
Reduce aggregates=[count(*)] // { arity: 1 }
Project () // { arity: 0 }
ReadIndex on=accumulable i_accumulable=[*** full scan ***] // { arity: 5 }
Used Indexes:
- materialize.public.i_accumulable (*** full scan ***)
Target cluster: idx_cluster
> SELECT count(*) FROM accumulable;
/* B */
10000001
> SET CLUSTER = default;
"""
if self._mz_version < MzVersion.parse_mz("v0.83.0-dev"):
sql = remove_arity_information_from_explain(sql)
if self._mz_version < MzVersion.parse_mz("v0.96.0-dev"):
sql = remove_target_cluster_from_explain(sql)
return Td(sql)
class Retraction(Dataflow):
"""Benchmark the time it takes to process a very large retraction"""
def before(self) -> Action:
return TdAction(
f"""
> DROP TABLE IF EXISTS ten CASCADE;
> CREATE TABLE ten (f1 INTEGER);
> INSERT INTO ten VALUES (0),(1),(2),(3),(4),(5),(6),(7),(8),(9);
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()}
> SELECT COUNT(*) = {self.n()} AS f1 FROM v1;
true
"""
)
def benchmark(self) -> MeasurementSource:
return Td(
"""
> SELECT 1
/* A */
1
> DELETE FROM ten;
> SELECT COUNT(*) FROM v1
/* B */
0
"""
)
class CreateIndex(Dataflow):
"""Measure the time it takes for CREATE INDEX to return *plus* the time
it takes for a SELECT query that would use the index to return rows.
"""
def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
f"""
> CREATE TABLE t1 (f1 INTEGER, f2 INTEGER);
> INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}
# Make sure the dataflow is fully hydrated
> SELECT 1 FROM t1 WHERE f1 = 0;
1
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
"""
> DROP INDEX IF EXISTS i1;
/* A */
> CREATE INDEX i1 ON t1(f1);
> SELECT COUNT(*)
FROM t1 AS a1, t1 AS a2
WHERE a1.f1 = a2.f1
AND a1.f1 = 0
AND a2.f1 = 0
/* B */
1
"""
)
class DeltaJoin(Dataflow):
def init(self) -> list[Action]:
return [
self.view_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> SELECT 1;
/* A */
1
# Delta joins require 3+ tables
> SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1
/* B */
{self.n()}
"""
)
class DeltaJoinMaintained(Dataflow):
"""Benchmark DeltaJoin as an indexed view with table-based data initialization, where the
empty frontier is not emitted, in contrast with one-shot SELECT processing based on data
initialized as a constant view"""
def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> DROP VIEW IF EXISTS v2;
/* A */
# Delta joins require 3+ tables
> CREATE VIEW v2 AS SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1
> CREATE DEFAULT INDEX ON v2
> SELECT * FROM v2
/* B */
{self.n()}
"""
)
class DifferentialJoin(Dataflow):
def init(self) -> list[Action]:
return [
self.view_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> SELECT 1;
/* A */
1
> SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1)
/* B */
{self.n()}
"""
)
class FullOuterJoin(Dataflow):
def benchmark(self) -> BenchmarkingSequence:
columns_select = ", ".join(
[f"a{i+1}.f1 AS f{i+1}" for i in range(0, floor(self.scale()))]
)
columns_using = ", ".join([f"f{i+1}" for i in range(0, floor(self.scale()))])
inserts = "\n".join([f"> INSERT INTO ten VALUES ({i+1})" for i in range(0, 10)])
return [
Td(
f"""
> DROP MATERIALIZED VIEW IF EXISTS v2 CASCADE;
> DROP MATERIALIZED VIEW IF EXISTS v1 CASCADE;
> DROP TABLE IF EXISTS ten;
> CREATE TABLE ten (f1 INTEGER);
> CREATE MATERIALIZED VIEW v1 AS SELECT {columns_select} FROM {self.join()}
> SELECT 1;
/* A */
1
> CREATE MATERIALIZED VIEW v2 AS
SELECT COUNT(a1.f1) AS c1, COUNT(a2.f1) AS c2
FROM v1 AS a1
FULL OUTER JOIN v1 AS a2 USING ({columns_using});
{inserts}
> SELECT * FROM v2;
/* B */
{self.n()} {self.n()}
"""
)
]
class Finish(Scenario):
"""Benchmarks around te Finish stage of query processing"""
class FinishOrderByLimit(Finish):
"""Benchmark ORDER BY + LIMIT without the benefit of an index"""
def init(self) -> list[Action]:
return [
self.view_ten(),
TdAction(
f"""
> CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}
> SELECT COUNT(*) = {self.n()} FROM v1;
true
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> SELECT 1
/* A */
1
> SELECT f2 FROM v1 ORDER BY 1 DESC LIMIT 1
/* B */
{self.n()-1}
"""
)
class Kafka(Scenario):
pass
class KafkaEnvelopeNoneBytes(Kafka):
def shared(self) -> Action:
data = "a" * 512
return TdAction(
f"""
$ kafka-create-topic topic=kafka-envelope-none-bytes
$ kafka-ingest format=bytes topic=kafka-envelope-none-bytes repeat={self.n()}
{data}
"""
)
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE
>[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE s1
IN CLUSTER source_cluster
FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-envelope-none-bytes-${{testdrive.seed}}')
FORMAT BYTES
ENVELOPE NONE
/* A */
> SELECT COUNT(*) = {self.n()} FROM s1
/* B */
true
"""
)
class KafkaUpsert(Kafka):
def shared(self) -> Action:
return TdAction(
self.keyschema()
+ self.schema()
+ f"""
$ kafka-create-topic topic=kafka-upsert
$ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": 1}} {{"f2": ${{kafka-ingest.iteration}} }}
$ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}}
{{"f1": 2}} {{"f2": 2}}
"""
)
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE
>[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY (
URL '${{testdrive.schema-registry-url}}'
);
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE s1
IN CLUSTER source_cluster
FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-upsert-${{testdrive.seed}}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT
/* A */
> SELECT f1 FROM s1
/* B */
1
2
"""
)
class KafkaUpsertUnique(Kafka):
def shared(self) -> Action:
return TdAction(
self.keyschema()
+ self.schema()
+ f"""
$ kafka-create-topic topic=upsert-unique partitions=16
$ kafka-ingest format=avro topic=upsert-unique key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
)
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CONNECTION IF EXISTS s1_csr_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE
>[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
> CREATE CONNECTION IF NOT EXISTS s1_csr_conn
TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
/* A */
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE s1
IN CLUSTER source_cluster
FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-upsert-unique-${{testdrive.seed}}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
ENVELOPE UPSERT
> SELECT COUNT(*) FROM s1;
/* B */
{self.n()}
"""
)
class KafkaRestart(ScenarioDisabled):
"""This scenario dates from the pre-persistence era where the entire topic was re-ingested from scratch.
With presistence however, no reingestion takes place and the scenario exhibits extreme variability.
Instead of re-ingestion, we are measuring mostly the speed of COUNT(*), further obscured by
the one second timestamp granularity
"""
def shared(self) -> Action:
return TdAction(
self.keyschema()
+ self.schema()
+ f"""
$ kafka-create-topic topic=kafka-recovery partitions=8
$ kafka-ingest format=avro topic=kafka-recovery key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
)
def init(self) -> Action:
return TdAction(
f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CONNECTION IF EXISTS s1_csr_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE
>[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
> CREATE CONNECTION IF NOT EXISTS s1_csr_conn
TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}');
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE s1
IN CLUSTER source_cluster
FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-${{testdrive.seed}}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
ENVELOPE UPSERT;
# Make sure we are fully caught up before continuing
> SELECT COUNT(*) FROM s1;
{self.n()}
# Give time for any background tasks (e.g. compaction) to settle down
> SELECT mz_unsafe.mz_sleep(10)
<null>
"""
)
def benchmark(self) -> BenchmarkingSequence:
return [
Lambda(lambda e: e.RestartMz()),
Td(
f"""
> SELECT COUNT(*) /* {self.n()} */ FROM s1;
/* B */
{self.n()}
"""
),
]
class KafkaRestartBig(ScenarioBig):
"""Ingest 100M records without constructing
a dataflow that would keep all of them in memory. For the purpose, we
emit a bunch of "EOF" records after the primary ingestion is complete
and consider that the source has caught up when all the EOF records have
been seen.
"""
SCALE = 8
def shared(self) -> list[Action]:
return [
TdAction("$ kafka-create-topic topic=kafka-recovery-big partitions=8"),
# Ingest 10 ** SCALE records
Kgen(
topic="kafka-recovery-big",
args=[
"--keys=random",
f"--num-records={self.n()}",
"--values=bytes",
"--max-message-size=32",
"--min-message-size=32",
"--key-min=256",
f"--key-max={256+(self.n()**2)}",
],
),
# Add 256 EOF markers with key values <= 256.
# This high number is chosen as to guarantee that there will be an EOF marker
# in each partition, even if the number of partitions is increased in the future.
Kgen(
topic="kafka-recovery-big",
args=[
"--keys=sequential",
"--num-records=256",
"--values=bytes",
"--min-message-size=32",
"--max-message-size=32",
],
),
]
def init(self) -> Action:
return TdAction(
f"""
>[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
> DROP CLUSTER IF EXISTS source_cluster CASCADE
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE s1
IN CLUSTER source_cluster
FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-big-${{testdrive.seed}}')
KEY FORMAT BYTES
VALUE FORMAT BYTES
ENVELOPE UPSERT;
# Confirm that all the EOF markers generated above have been processed
> CREATE MATERIALIZED VIEW s1_is_complete AS SELECT COUNT(*) = 256 FROM s1 WHERE key <= '\\x00000000000000ff'
> SELECT * FROM s1_is_complete;
true
"""
)
def benchmark(self) -> BenchmarkingSequence:
return [
Lambda(lambda e: e.RestartMz()),
Td(
"""
> SELECT * FROM s1_is_complete
/* B */
true
"""
),
]
for i in [5, 6, 7, 8, 9]:
__pdoc__[f"KafkaEnvelopeNoneBytesScalability_scale_{i}"] = False
@parameterized_class(
[{"SCALE": i} for i in [5, 6, 7, 8, 9]], class_name_func=Scenario.name_with_scale
)
class KafkaEnvelopeNoneBytesScalability(ScenarioBig):
"""Run the same scenario across different scales. Do not materialize the entire
source but rather just a non-memory-consuming view on top of it.
"""
def shared(self) -> list[Action]:
return [
TdAction(
"""
$ kafka-create-topic topic=kafka-scalability partitions=8
"""
),
Kgen(
topic="kafka-scalability",
args=[
"--keys=sequential",
f"--num-records={self.n()}",
"--values=bytes",
"--max-message-size=100",
"--min-message-size=100",
],
),
]
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE
> DROP CLUSTER IF EXISTS source_cluster CASCADE
>[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE s1
IN CLUSTER source_cluster
FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-scalability-${{testdrive.seed}}')
KEY FORMAT BYTES
VALUE FORMAT BYTES
ENVELOPE NONE
/* A */
> CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) AS c FROM s1;
> SELECT c = {self.n()} FROM v1
/* B */
true
"""
)
class Sink(Scenario):
pass
class ExactlyOnce(Sink):
"""Measure the time it takes to emit 1M records to a reuse_topic=true sink. As we have limited
means to figure out when the complete output has been emited, we have no option but to re-ingest
the data to determine completion.
"""
def shared(self) -> Action:
return TdAction(
self.keyschema()
+ self.schema()
+ f"""
$ kafka-create-topic topic=sink-input partitions=16
$ kafka-ingest format=avro topic=sink-input key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
)
def init(self) -> Action:
return TdAction(
f"""
>[version<7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
> DROP CLUSTER IF EXISTS source_cluster CASCADE
> CREATE CONNECTION IF NOT EXISTS csr_conn
FOR CONFLUENT SCHEMA REGISTRY
URL '${{testdrive.schema-registry-url}}';
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE source1
IN CLUSTER source_cluster
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-input-${{testdrive.seed}}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT;
> SELECT COUNT(*) FROM source1;
{self.n()}
"""
)
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> DROP SINK IF EXISTS sink1;
> DROP SOURCE IF EXISTS sink1_check CASCADE;
/* A */
> DROP CLUSTER IF EXISTS sink_cluster CASCADE
> CREATE CLUSTER sink_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SINK sink1
IN CLUSTER sink_cluster
FROM source1
INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
KEY (f1)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE DEBEZIUM
$ kafka-verify-topic sink=materialize.public.sink1 await-value-schema=true await-key-schema=true
# Wait until all the records have been emited from the sink, as observed by the sink1_check source
> CREATE SOURCE sink1_check
IN CLUSTER source_cluster
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT;
> CREATE MATERIALIZED VIEW sink1_check_v AS SELECT COUNT(*) FROM sink1_check;
> SELECT * FROM sink1_check_v
/* B */
"""
+ str(self.n())
)
class PgCdc(Scenario):
pass
class PgCdcInitialLoad(PgCdc):
"""Measure the time it takes to read 1M existing records from Postgres
when creating a materialized source"""
def shared(self) -> Action:
return TdAction(
f"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER USER postgres WITH replication;
DROP SCHEMA IF EXISTS public CASCADE;
CREATE SCHEMA public;
DROP PUBLICATION IF EXISTS mz_source;
CREATE PUBLICATION mz_source FOR ALL TABLES;
CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
INSERT INTO pk_table SELECT x, x*2 FROM generate_series(1, {self.n()}) as x;
ALTER TABLE pk_table REPLICA IDENTITY FULL;
"""
)
def before(self) -> Action:
return TdAction(
"""
> DROP SOURCE IF EXISTS mz_source_pgcdc CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE
"""
)
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'
> CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES (
HOST postgres,
DATABASE postgres,
USER postgres,
PASSWORD SECRET pgpass
)
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE mz_source_pgcdc
IN CLUSTER source_cluster
FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'mz_source')
FOR TABLES ("pk_table")
/* A */
> SELECT count(*) FROM pk_table
/* B */
{self.n()}
"""
)
class PgCdcStreaming(PgCdc):
"""Measure the time it takes to ingest records from Postgres post-snapshot"""
SCALE = 5
def shared(self) -> Action:
return TdAction(
"""
$ postgres-execute connection=postgres://postgres:postgres@postgres
ALTER USER postgres WITH replication;
DROP SCHEMA IF EXISTS public CASCADE;
CREATE SCHEMA public;
DROP PUBLICATION IF EXISTS p1;
CREATE PUBLICATION p1 FOR ALL TABLES;
"""
)
def before(self) -> Action:
return TdAction(
f"""
> DROP SOURCE IF EXISTS s1 CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE;
$ postgres-execute connection=postgres://postgres:postgres@postgres
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT);
ALTER TABLE t1 REPLICA IDENTITY FULL;
> CREATE SECRET IF NOT EXISTS pgpass AS 'postgres'
> CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES (
HOST postgres,
DATABASE postgres,
USER postgres,
PASSWORD SECRET pgpass
)
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE s1
IN CLUSTER source_cluster
FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'p1')
FOR TABLES ("t1")
"""
)
def benchmark(self) -> MeasurementSource:
insertions = "\n".join(
[
f"INSERT INTO t1 (f2) SELECT x FROM generate_series(1, {self.n()/1000}) as x;\nCOMMIT;"
for i in range(0, 1000)
]
)
return Td(
f"""
> SELECT 1;
/* A */
1
$ postgres-execute connection=postgres://postgres:postgres@postgres
{insertions}
> SELECT count(*) FROM t1
/* B */
{self.n()}
"""
)
class MySqlCdc(Scenario):
pass
class MySqlInitialLoad(MySqlCdc):
"""Measure the time it takes to read 1M existing records from MySQL
when creating a materialized source"""
FIXED_SCALE = True # TODO: Remove when #25323 is fixed
def shared(self) -> Action:
return TdAction(
f"""
$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}
$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
SET @i:=0;
CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT);
INSERT INTO pk_table SELECT @i:=@i+1, @i*@i FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.n()};
"""
)
def before(self) -> Action:
return TdAction(
"""
> DROP SOURCE IF EXISTS mz_source_mysqlcdc CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE
"""
)
def benchmark(self) -> MeasurementSource:
return Td(
f"""
> CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
> CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
HOST mysql,
USER root,
PASSWORD SECRET mysqlpass
)
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE mz_source_mysqlcdc
IN CLUSTER source_cluster
FROM MYSQL CONNECTION mysql_conn
FOR TABLES (public.pk_table)
/* A */
> SELECT count(*) FROM pk_table
/* B */
{self.n()}
"""
)
class MySqlStreaming(MySqlCdc):
"""Measure the time it takes to ingest records from MySQL post-snapshot"""
SCALE = 5
@classmethod
def can_run(cls, version: MzVersion) -> bool:
return version >= MzVersion.parse_mz("v0.88.0-dev")
def shared(self) -> Action:
return TdAction(
"""
$ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password}
$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
"""
)
def before(self) -> Action:
return TdAction(
f"""
> DROP SOURCE IF EXISTS s1 CASCADE;
> DROP CLUSTER IF EXISTS source_cluster CASCADE;
$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}
$ mysql-execute name=mysql
DROP DATABASE IF EXISTS public;
CREATE DATABASE public;
USE public;
DROP TABLE IF EXISTS t1;
CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT);
> CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}'
> CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL (
HOST mysql,
USER root,
PASSWORD SECRET mysqlpass
)
> CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE s1
IN CLUSTER source_cluster
FROM MYSQL CONNECTION mysql_conn
FOR TABLES (public.t1)
"""
)
def benchmark(self) -> MeasurementSource:
insertions = "\n".join(
[
dedent(
f"""
SET @i:=0;
INSERT INTO t1 (f2) SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {round(self.n()/1000)};
COMMIT;
"""
)
for i in range(0, 1000)
]
)
return Td(
f"""
> SELECT 1;
/* A */
1
$ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}}
$ mysql-execute name=mysql
USE public;
{insertions}
> SELECT count(*) FROM t1
/* B */
{self.n()}
"""
)
class Coordinator(Scenario):
"""Feature benchmarks pertaining to the coordinator."""
class QueryLatency(Coordinator):
SCALE = 3
"""Measure the time it takes to run SELECT 1 queries"""
def benchmark(self) -> MeasurementSource:
selects = "\n".join("> SELECT 1\n1\n" for i in range(0, self.n()))
return Td(
f"""
> SET auto_route_introspection_queries TO false
> BEGIN
> SELECT 1;
/* A */
1
{selects}
> SELECT 1;
/* B */
1
"""
)
class ConnectionLatency(Coordinator):
"""Measure the time it takes to establish connections to Mz"""
SCALE = 2 # Many connections * many measurements = TCP port exhaustion
def benchmark(self) -> MeasurementSource:
connections = "\n".join(
"""
$ postgres-execute connection=postgres://materialize:materialize@${testdrive.materialize-sql-addr}
SELECT 1;
"""
for i in range(0, self.n())
)
return Td(
f"""
> SET auto_route_introspection_queries TO false
> BEGIN
> SELECT 1;
/* A */
1
{connections}
> SELECT 1;
/* B */
1
"""
)
class Startup(Scenario):
pass
class StartupEmpty(Startup):
"""Measure the time it takes to restart an empty Mz instance."""
def benchmark(self) -> BenchmarkingSequence:
return [
Lambda(lambda e: e.RestartMz()),
Td(
"""
> SELECT 1;
/* B */
1
"""
),
]
class StartupLoaded(Scenario):
"""Measure the time it takes to restart a populated Mz instance and have all the dataflows be ready to return something"""
SCALE = 1.2 # 25 objects of each kind
FIXED_SCALE = (
True # Can not scale to 100s of objects, so --size=+N will have no effect
)
def shared(self) -> Action:
return TdAction(
self.schema()
+ """
$ kafka-create-topic topic=startup-time
$ kafka-ingest format=avro topic=startup-time schema=${schema} repeat=1
{"f2": 1}
"""
)
def init(self) -> Action:
create_tables = "\n".join(
f"> CREATE TABLE t{i} (f1 INTEGER);\n> INSERT INTO t{i} DEFAULT VALUES;"
for i in range(0, self.n())
)
create_sources = "\n".join(
f"""
> DROP CLUSTER IF EXISTS source{i}_cluster CASCADE;
> CREATE CLUSTER source{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE source{i}
IN CLUSTER source{i}_cluster
FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-startup-time-${{testdrive.seed}}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
ENVELOPE NONE
"""
for i in range(0, self.n())
)
join = " ".join(
f"LEFT JOIN source{i} USING (f2)" for i in range(1, (ceil(self.scale())))
)
create_views = "\n".join(
f"> CREATE MATERIALIZED VIEW v{i} AS SELECT * FROM source{i} AS s {join} LIMIT {i+1}"
for i in range(0, self.n())
)
create_sinks = "\n".join(
f"""
> DROP CLUSTER IF EXISTS sink{i}_cluster;
> CREATE CLUSTER sink{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SINK sink{i}
IN CLUSTER sink{i}_cluster
FROM source{i}
INTO KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}')
KEY (f2)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn
ENVELOPE DEBEZIUM
"""
for i in range(0, self.n())
)
return TdAction(
f"""
$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}}
$ postgres-execute connection=mz_system
ALTER SYSTEM SET max_objects_per_schema = {self.n() * 10};
ALTER SYSTEM SET max_materialized_views = {self.n() * 2};
ALTER SYSTEM SET max_sources = {self.n() * 2};
ALTER SYSTEM SET max_sinks = {self.n() * 2};
ALTER SYSTEM SET max_tables = {self.n() * 2};
ALTER SYSTEM SET max_clusters = {self.n() * 6};
> DROP OWNED BY materialize CASCADE;
>[version<7800] CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
> CREATE CONNECTION IF NOT EXISTS s1_csr_conn
FOR CONFLUENT SCHEMA REGISTRY
URL '${{testdrive.schema-registry-url}}';
{create_tables}
{create_sources}
{create_views}
{create_sinks}
"""
)
def benchmark(self) -> BenchmarkingSequence:
check_tables = "\n".join(
f"> SELECT COUNT(*) >= 0 FROM t{i}\ntrue" for i in range(0, self.n())
)
check_sources = "\n".join(
f"> SELECT COUNT(*) > 0 FROM source{i}\ntrue" for i in range(0, self.n())
)
check_views = "\n".join(
f"> SELECT COUNT(*) > 0 FROM v{i}\ntrue" for i in range(0, self.n())
)
return [
Lambda(lambda e: e.RestartMz()),
Td(
f"""
{check_views}
{check_sources}
{check_tables}
> SELECT 1;
/* B */
1
"""
),
]
class HydrateIndex(Scenario):
"""Measure the time it takes for an index to hydrate when a cluster comes online."""
def init(self) -> list[Action]:
return [
self.table_ten(),
TdAction(
"""
> CREATE CLUSTER idx_cluster SIZE '16', REPLICATION FACTOR 1
"""
),
]
def benchmark(self) -> MeasurementSource:
sql = f"""
> DROP TABLE IF EXISTS t1 CASCADE
> CREATE TABLE t1 (f1 INTEGER, f2 INTEGER)
> ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 0)
> CREATE INDEX i1 IN CLUSTER idx_cluster ON t1(f1)
> INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()}
> UPDATE t1 SET f1 = f1 + 100000
> UPDATE t1 SET f1 = f1 + 1000000
> UPDATE t1 SET f1 = f1 + 10000000
> UPDATE t1 SET f1 = f1 + 100000000
> UPDATE t1 SET f1 = f1 + 1000000000
> SELECT 1
/* A */
1
> ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 1)
> SET CLUSTER = idx_cluster
? EXPLAIN SELECT COUNT(*) FROM t1
Explained Query:
Return // {{ arity: 1 }}
Union // {{ arity: 1 }}
Get l0 // {{ arity: 1 }}
Map (0) // {{ arity: 1 }}
Union // {{ arity: 0 }}
Negate // {{ arity: 0 }}
Project () // {{ arity: 0 }}
Get l0 // {{ arity: 1 }}
Constant // {{ arity: 0 }}
- ()
With
cte l0 =
Reduce aggregates=[count(*)] // {{ arity: 1 }}
Project () // {{ arity: 0 }}
ReadIndex on=t1 i1=[*** full scan ***] // {{ arity: 2 }}
Used Indexes:
- materialize.public.i1 (*** full scan ***)
Target cluster: idx_cluster
> SELECT COUNT(*) FROM t1
/* B */
{self._n}
> SET CLUSTER = default
"""
if self._mz_version < MzVersion.parse_mz("v0.83.0-dev"):
sql = remove_arity_information_from_explain(sql)
if self._mz_version < MzVersion.parse_mz("v0.96.0-dev"):
sql = remove_target_cluster_from_explain(sql)
return Td(sql)
def remove_arity_information_from_explain(sql: str) -> str:
return re.sub(r" // { arity: \d+ }", "", sql)
def remove_target_cluster_from_explain(sql: str) -> str:
return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
class SwapSchema(Scenario):
SCALE = 2
FIXED_SCALE = True
def init(self) -> list[Action]:
blue_views_on_table = "\n".join(
f"> CREATE VIEW blue.v{i} AS SELECT * FROM blue.t1;"
for i in range(0, self.n())
)
green_views_on_table = "\n".join(
f"> CREATE VIEW green.v{i} AS SELECT * FROM green.t1;"
for i in range(0, self.n())
)
noise_views_on_blue_view = "\n".join(
f"> CREATE VIEW noise.v{i} AS SELECT * FROM blue.v0;"
for i in range(0, self.n())
)
noise_views_on_noise_view = "\n".join(
f"> CREATE VIEW noise.extra_v{i} AS SELECT * FROM noise.v0;"
for i in range(0, self.n())
)
return [
TdAction(
f"""
> CREATE SCHEMA blue;
> CREATE SCHEMA green;
> CREATE SCHEMA noise;
> CREATE TABLE blue.t1 (a int, b text);
> CREATE TABLE green.t1 (a int, b text);
{blue_views_on_table}
{green_views_on_table}
{noise_views_on_blue_view}
{noise_views_on_noise_view}
"""
),
]
def benchmark(self) -> MeasurementSource:
return Td(
dedent(
"""
> SELECT 1;
/* A */
1
> ALTER SCHEMA blue SWAP WITH green;
> SELECT 1;
/* B */
1
"""
)
)
Functions
def remove_arity_information_from_explain(sql: str) ‑> str
-
Expand source code Browse git
def remove_arity_information_from_explain(sql: str) -> str: return re.sub(r" // { arity: \d+ }", "", sql)
def remove_target_cluster_from_explain(sql: str) ‑> str
-
Expand source code Browse git
def remove_target_cluster_from_explain(sql: str) -> str: return re.sub(r"\n\s*Target cluster: \w+\n", "", sql)
Classes
class AccumulateReductions (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark the accumulation of reductions.
Expand source code Browse git
class AccumulateReductions(Dataflow): """Benchmark the accumulation of reductions.""" def before(self) -> Action: return TdAction( """ > DROP TABLE IF EXISTS t CASCADE; > CREATE TABLE t (a int, b int, c int, d int); > CREATE MATERIALIZED VIEW data AS SELECT a, a AS b FROM generate_series(1, 10000000) AS a UNION ALL SELECT a, b FROM t; > INSERT INTO t (a, b) VALUES (1, 1); > INSERT INTO t (a, b) VALUES (0, 0); > DROP CLUSTER IF EXISTS idx_cluster CASCADE; > CREATE CLUSTER idx_cluster SIZE '1-8G', REPLICATION FACTOR 1; > CREATE VIEW accumulable AS SELECT a, sum(a) AS sum_a, count(a) as cnt_a, sum(b) AS sum_b, count(b) as cnt_b FROM data GROUP BY a; """ ) def benchmark(self) -> MeasurementSource: sql = """ > SELECT 1 /* A */ 1 > CREATE INDEX i_accumulable IN CLUSTER idx_cluster ON accumulable(a); > SET CLUSTER = idx_cluster; ? EXPLAIN SELECT count(*) FROM accumulable; Explained Query: Return // { arity: 1 } Union // { arity: 1 } Get l0 // { arity: 1 } Map (0) // { arity: 1 } Union // { arity: 0 } Negate // { arity: 0 } Project () // { arity: 0 } Get l0 // { arity: 1 } Constant // { arity: 0 } - () With cte l0 = Reduce aggregates=[count(*)] // { arity: 1 } Project () // { arity: 0 } ReadIndex on=accumulable i_accumulable=[*** full scan ***] // { arity: 5 } Used Indexes: - materialize.public.i_accumulable (*** full scan ***) Target cluster: idx_cluster > SELECT count(*) FROM accumulable; /* B */ 10000001 > SET CLUSTER = default; """ if self._mz_version < MzVersion.parse_mz("v0.83.0-dev"): sql = remove_arity_information_from_explain(sql) if self._mz_version < MzVersion.parse_mz("v0.96.0-dev"): sql = remove_target_cluster_from_explain(sql) return Td(sql)
Ancestors
Methods
def before(self) ‑> Action
-
Expand source code Browse git
def before(self) -> Action: return TdAction( """ > DROP TABLE IF EXISTS t CASCADE; > CREATE TABLE t (a int, b int, c int, d int); > CREATE MATERIALIZED VIEW data AS SELECT a, a AS b FROM generate_series(1, 10000000) AS a UNION ALL SELECT a, b FROM t; > INSERT INTO t (a, b) VALUES (1, 1); > INSERT INTO t (a, b) VALUES (0, 0); > DROP CLUSTER IF EXISTS idx_cluster CASCADE; > CREATE CLUSTER idx_cluster SIZE '1-8G', REPLICATION FACTOR 1; > CREATE VIEW accumulable AS SELECT a, sum(a) AS sum_a, count(a) as cnt_a, sum(b) AS sum_b, count(b) as cnt_b FROM data GROUP BY a; """ )
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: sql = """ > SELECT 1 /* A */ 1 > CREATE INDEX i_accumulable IN CLUSTER idx_cluster ON accumulable(a); > SET CLUSTER = idx_cluster; ? EXPLAIN SELECT count(*) FROM accumulable; Explained Query: Return // { arity: 1 } Union // { arity: 1 } Get l0 // { arity: 1 } Map (0) // { arity: 1 } Union // { arity: 0 } Negate // { arity: 0 } Project () // { arity: 0 } Get l0 // { arity: 1 } Constant // { arity: 0 } - () With cte l0 = Reduce aggregates=[count(*)] // { arity: 1 } Project () // { arity: 0 } ReadIndex on=accumulable i_accumulable=[*** full scan ***] // { arity: 5 } Used Indexes: - materialize.public.i_accumulable (*** full scan ***) Target cluster: idx_cluster > SELECT count(*) FROM accumulable; /* B */ 10000001 > SET CLUSTER = default; """ if self._mz_version < MzVersion.parse_mz("v0.83.0-dev"): sql = remove_arity_information_from_explain(sql) if self._mz_version < MzVersion.parse_mz("v0.96.0-dev"): sql = remove_target_cluster_from_explain(sql) return Td(sql)
Inherited members
class ConnectionLatency (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes to establish connections to Mz
Expand source code Browse git
class ConnectionLatency(Coordinator): """Measure the time it takes to establish connections to Mz""" SCALE = 2 # Many connections * many measurements = TCP port exhaustion def benchmark(self) -> MeasurementSource: connections = "\n".join( """ $ postgres-execute connection=postgres://materialize:materialize@${testdrive.materialize-sql-addr} SELECT 1; """ for i in range(0, self.n()) ) return Td( f""" > SET auto_route_introspection_queries TO false > BEGIN > SELECT 1; /* A */ 1 {connections} > SELECT 1; /* B */ 1 """ )
Ancestors
Class variables
var SCALE : float
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: connections = "\n".join( """ $ postgres-execute connection=postgres://materialize:materialize@${testdrive.materialize-sql-addr} SELECT 1; """ for i in range(0, self.n()) ) return Td( f""" > SET auto_route_introspection_queries TO false > BEGIN > SELECT 1; /* A */ 1 {connections} > SELECT 1; /* B */ 1 """ )
Inherited members
class Coordinator (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Feature benchmarks pertaining to the coordinator.
Expand source code Browse git
class Coordinator(Scenario): """Feature benchmarks pertaining to the coordinator."""
Ancestors
Subclasses
Inherited members
class CountDistinct (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark scenarios around individual dataflow patterns/operators
Expand source code Browse git
class CountDistinct(Dataflow): def init(self) -> list[Action]: return [ self.view_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}; > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ] def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1 /* A */ 1 > SELECT COUNT(DISTINCT f1) AS f1 FROM v1 /* B */ {self.n()} """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1 /* A */ 1 > SELECT COUNT(DISTINCT f1) AS f1 FROM v1 /* B */ {self.n()} """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.view_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()}; > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ]
Inherited members
class CreateIndex (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes for CREATE INDEX to return plus the time it takes for a SELECT query that would use the index to return rows.
Expand source code Browse git
class CreateIndex(Dataflow): """Measure the time it takes for CREATE INDEX to return *plus* the time it takes for a SELECT query that would use the index to return rows. """ def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE TABLE t1 (f1 INTEGER, f2 INTEGER); > INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()} # Make sure the dataflow is fully hydrated > SELECT 1 FROM t1 WHERE f1 = 0; 1 """ ), ] def benchmark(self) -> MeasurementSource: return Td( """ > DROP INDEX IF EXISTS i1; /* A */ > CREATE INDEX i1 ON t1(f1); > SELECT COUNT(*) FROM t1 AS a1, t1 AS a2 WHERE a1.f1 = a2.f1 AND a1.f1 = 0 AND a2.f1 = 0 /* B */ 1 """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( """ > DROP INDEX IF EXISTS i1; /* A */ > CREATE INDEX i1 ON t1(f1); > SELECT COUNT(*) FROM t1 AS a1, t1 AS a2 WHERE a1.f1 = a2.f1 AND a1.f1 = 0 AND a2.f1 = 0 /* B */ 1 """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE TABLE t1 (f1 INTEGER, f2 INTEGER); > INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()} # Make sure the dataflow is fully hydrated > SELECT 1 FROM t1 WHERE f1 = 0; 1 """ ), ]
Inherited members
class CrossJoin (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark scenarios around individual dataflow patterns/operators
Expand source code Browse git
class CrossJoin(Dataflow): def init(self) -> Action: return self.view_ten() def benchmark(self) -> MeasurementSource: return Td( f""" > DROP MATERIALIZED VIEW IF EXISTS v1; > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()} /* A */ > SELECT COUNT(*) = {self.n()} AS f1 FROM v1; /* B */ true """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > DROP MATERIALIZED VIEW IF EXISTS v1; > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()} /* A */ > SELECT COUNT(*) = {self.n()} AS f1 FROM v1; /* B */ true """ )
def init(self) ‑> Action
-
Expand source code Browse git
def init(self) -> Action: return self.view_ten()
Inherited members
class DML (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmarks around the performance of DML statements
Expand source code Browse git
class DML(Scenario): """Benchmarks around the performance of DML statements""" pass
Ancestors
Subclasses
- Insert
- InsertAndSelect
- InsertBatch
- InsertMultiRow
- ManySmallInserts
- ManySmallUpdates
- Update
- UpdateMultiNoIndex
Inherited members
class Dataflow (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark scenarios around individual dataflow patterns/operators
Expand source code Browse git
class Dataflow(Scenario): """Benchmark scenarios around individual dataflow patterns/operators""" pass
Ancestors
Subclasses
- AccumulateReductions
- CountDistinct
- CreateIndex
- CrossJoin
- DeltaJoin
- DeltaJoinMaintained
- DifferentialJoin
- FullOuterJoin
- GroupBy
- GroupByMaintained
- MinMax
- MinMaxMaintained
- OrderBy
- Retraction
Inherited members
class DeltaJoin (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark scenarios around individual dataflow patterns/operators
Expand source code Browse git
class DeltaJoin(Dataflow): def init(self) -> list[Action]: return [ self.view_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()} """ ), ] def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1; /* A */ 1 # Delta joins require 3+ tables > SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1 /* B */ {self.n()} """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1; /* A */ 1 # Delta joins require 3+ tables > SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1 /* B */ {self.n()} """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.view_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()} """ ), ]
Inherited members
class DeltaJoinMaintained (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark DeltaJoin as an indexed view with table-based data initialization, where the empty frontier is not emitted, in contrast with one-shot SELECT processing based on data initialized as a constant view
Expand source code Browse git
class DeltaJoinMaintained(Dataflow): """Benchmark DeltaJoin as an indexed view with table-based data initialization, where the empty frontier is not emitted, in contrast with one-shot SELECT processing based on data initialized as a constant view""" def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()} """ ), ] def benchmark(self) -> MeasurementSource: return Td( f""" > DROP VIEW IF EXISTS v2; /* A */ # Delta joins require 3+ tables > CREATE VIEW v2 AS SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1 > CREATE DEFAULT INDEX ON v2 > SELECT * FROM v2 /* B */ {self.n()} """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > DROP VIEW IF EXISTS v2; /* A */ # Delta joins require 3+ tables > CREATE VIEW v2 AS SELECT COUNT(*) FROM v1 AS a1 , v1 AS a2 , v1 AS a3 WHERE a1.f1 = a2.f1 AND a2.f1 = a3.f1 > CREATE DEFAULT INDEX ON v2 > SELECT * FROM v2 /* B */ {self.n()} """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()} """ ), ]
Inherited members
class DifferentialJoin (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark scenarios around individual dataflow patterns/operators
Expand source code Browse git
class DifferentialJoin(Dataflow): def init(self) -> list[Action]: return [ self.view_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()} """ ), ] def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1; /* A */ 1 > SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1) /* B */ {self.n()} """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1; /* A */ 1 > SELECT COUNT(*) FROM v1 AS a1 JOIN v1 AS a2 USING (f1) /* B */ {self.n()} """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.view_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()} """ ), ]
Inherited members
class ExactlyOnce (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes to emit 1M records to a reuse_topic=true sink. As we have limited means to figure out when the complete output has been emited, we have no option but to re-ingest the data to determine completion.
Expand source code Browse git
class ExactlyOnce(Sink): """Measure the time it takes to emit 1M records to a reuse_topic=true sink. As we have limited means to figure out when the complete output has been emited, we have no option but to re-ingest the data to determine completion. """ def shared(self) -> Action: return TdAction( self.keyschema() + self.schema() + f""" $ kafka-create-topic topic=sink-input partitions=16 $ kafka-ingest format=avro topic=sink-input key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()} {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }} """ ) def init(self) -> Action: return TdAction( f""" >[version<7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > DROP CLUSTER IF EXISTS source_cluster CASCADE > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}'; > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE source1 IN CLUSTER source_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-input-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT; > SELECT COUNT(*) FROM source1; {self.n()} """ ) def benchmark(self) -> MeasurementSource: return Td( f""" > DROP SINK IF EXISTS sink1; > DROP SOURCE IF EXISTS sink1_check CASCADE; /* A */ > DROP CLUSTER IF EXISTS sink_cluster CASCADE > CREATE CLUSTER sink_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SINK sink1 IN CLUSTER sink_cluster FROM source1 INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}') KEY (f1) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE DEBEZIUM $ kafka-verify-topic sink=materialize.public.sink1 await-value-schema=true await-key-schema=true # Wait until all the records have been emited from the sink, as observed by the sink1_check source > CREATE SOURCE sink1_check IN CLUSTER source_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}') KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT; > CREATE MATERIALIZED VIEW sink1_check_v AS SELECT COUNT(*) FROM sink1_check; > SELECT * FROM sink1_check_v /* B */ """ + str(self.n()) )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > DROP SINK IF EXISTS sink1; > DROP SOURCE IF EXISTS sink1_check CASCADE; /* A */ > DROP CLUSTER IF EXISTS sink_cluster CASCADE > CREATE CLUSTER sink_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SINK sink1 IN CLUSTER sink_cluster FROM source1 INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}') KEY (f1) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE DEBEZIUM $ kafka-verify-topic sink=materialize.public.sink1 await-value-schema=true await-key-schema=true # Wait until all the records have been emited from the sink, as observed by the sink1_check source > CREATE SOURCE sink1_check IN CLUSTER source_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}') KEY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT; > CREATE MATERIALIZED VIEW sink1_check_v AS SELECT COUNT(*) FROM sink1_check; > SELECT * FROM sink1_check_v /* B */ """ + str(self.n()) )
def init(self) ‑> Action
-
Expand source code Browse git
def init(self) -> Action: return TdAction( f""" >[version<7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > DROP CLUSTER IF EXISTS source_cluster CASCADE > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}'; > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE source1 IN CLUSTER source_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink-input-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT; > SELECT COUNT(*) FROM source1; {self.n()} """ )
-
Expand source code Browse git
def shared(self) -> Action: return TdAction( self.keyschema() + self.schema() + f""" $ kafka-create-topic topic=sink-input partitions=16 $ kafka-ingest format=avro topic=sink-input key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()} {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }} """ )
Inherited members
class FastPath (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Feature benchmarks related to the "fast path" in query execution, as described in the 'Internals of One-off Queries' presentation.
Expand source code Browse git
class FastPath(Scenario): """Feature benchmarks related to the "fast path" in query execution, as described in the 'Internals of One-off Queries' presentation. """
Ancestors
Subclasses
Inherited members
class FastPathFilterIndex (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes for the fast path to filter our all rows from a materialized view using an index and return
Expand source code Browse git
class FastPathFilterIndex(FastPath): """Measure the time it takes for the fast path to filter our all rows from a materialized view using an index and return""" def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()} > CREATE DEFAULT INDEX ON v1; > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ] # Since an individual query of this particular type being benchmarked takes 1ms to execute, the results are susceptible # to a lot of random noise. As we can not make the query any slower by using e.g. a large dataset, # we run the query 100 times in a row and measure the total execution time. def benchmark(self) -> MeasurementSource: hundred_selects = "\n".join( "> SELECT * FROM v1 WHERE f1 = 1;\n1\n" for i in range(0, 1000) ) return Td( f""" > SET auto_route_introspection_queries TO false > BEGIN > SELECT 1; /* A */ 1 {hundred_selects} > SELECT 1 /* B */ 1 """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: hundred_selects = "\n".join( "> SELECT * FROM v1 WHERE f1 = 1;\n1\n" for i in range(0, 1000) ) return Td( f""" > SET auto_route_introspection_queries TO false > BEGIN > SELECT 1; /* A */ 1 {hundred_selects} > SELECT 1 /* B */ 1 """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()} > CREATE DEFAULT INDEX ON v1; > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ]
Inherited members
class FastPathFilterNoIndex (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes for the fast path to filter our all rows from a materialized view and return
Expand source code Browse git
class FastPathFilterNoIndex(FastPath): """Measure the time it takes for the fast path to filter our all rows from a materialized view and return""" SCALE = 7 FIXED_SCALE = True # OOM with 10**8 = 100M records def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()}); > CREATE DEFAULT INDEX ON v1; > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ] def benchmark(self) -> MeasurementSource: return Td( """ > /* A */ SELECT 1; 1 > /* B */ SELECT * FROM v1 WHERE f2 < 0; """ )
Ancestors
Class variables
var FIXED_SCALE : bool
var SCALE : float
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( """ > /* A */ SELECT 1; 1 > /* B */ SELECT * FROM v1 WHERE f2 < 0; """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()}); > CREATE DEFAULT INDEX ON v1; > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ]
Inherited members
class FastPathLimit (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark the case SELECT * FROM source LIMIT , optimized by #21615
Expand source code Browse git
class FastPathLimit(FastPath): """Benchmark the case SELECT * FROM source LIMIT <i> , optimized by #21615""" def init(self) -> list[Action]: return [ TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT * FROM generate_series(1, {self.n()}) """ ), ] def benchmark(self) -> MeasurementSource: return Td( dedent( """ > SELECT 1; /* A */ 1 > SELECT * FROM v1 LIMIT 100 /* B */ """ ) + "\n".join([str(x) for x in range(1, 101)]) )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( dedent( """ > SELECT 1; /* A */ 1 > SELECT * FROM v1 LIMIT 100 /* B */ """ ) + "\n".join([str(x) for x in range(1, 101)]) )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT * FROM generate_series(1, {self.n()}) """ ), ]
Inherited members
class FastPathOrderByLimit (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark the case SELECT * FROM materialized_view ORDER BY
LIMIT Expand source code Browse git
class FastPathOrderByLimit(FastPath): """Benchmark the case SELECT * FROM materialized_view ORDER BY <key> LIMIT <i>""" def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}; > CREATE DEFAULT INDEX ON v1; > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ] def benchmark(self) -> MeasurementSource: return Td( """ > SELECT 1; /* A */ 1 > SELECT f1 FROM v1 ORDER BY f1 DESC LIMIT 1000 /* B */ """ + "\n".join([str(x) for x in range(self.n() - 1000, self.n())]) )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( """ > SELECT 1; /* A */ 1 > SELECT f1 FROM v1 ORDER BY f1 DESC LIMIT 1000 /* B */ """ + "\n".join([str(x) for x in range(self.n() - 1000, self.n())]) )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}; > CREATE DEFAULT INDEX ON v1; > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ]
Inherited members
class Finish (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmarks around te Finish stage of query processing
Expand source code Browse git
class Finish(Scenario): """Benchmarks around te Finish stage of query processing"""
Ancestors
Subclasses
Inherited members
class FinishOrderByLimit (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark ORDER BY + LIMIT without the benefit of an index
Expand source code Browse git
class FinishOrderByLimit(Finish): """Benchmark ORDER BY + LIMIT without the benefit of an index""" def init(self) -> list[Action]: return [ self.view_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()} > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ] def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1 /* A */ 1 > SELECT f2 FROM v1 ORDER BY 1 DESC LIMIT 1 /* B */ {self.n()-1} """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1 /* A */ 1 > SELECT f2 FROM v1 ORDER BY 1 DESC LIMIT 1 /* B */ {self.n()-1} """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.view_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()} > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ]
Inherited members
class FullOuterJoin (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark scenarios around individual dataflow patterns/operators
Expand source code Browse git
class FullOuterJoin(Dataflow): def benchmark(self) -> BenchmarkingSequence: columns_select = ", ".join( [f"a{i+1}.f1 AS f{i+1}" for i in range(0, floor(self.scale()))] ) columns_using = ", ".join([f"f{i+1}" for i in range(0, floor(self.scale()))]) inserts = "\n".join([f"> INSERT INTO ten VALUES ({i+1})" for i in range(0, 10)]) return [ Td( f""" > DROP MATERIALIZED VIEW IF EXISTS v2 CASCADE; > DROP MATERIALIZED VIEW IF EXISTS v1 CASCADE; > DROP TABLE IF EXISTS ten; > CREATE TABLE ten (f1 INTEGER); > CREATE MATERIALIZED VIEW v1 AS SELECT {columns_select} FROM {self.join()} > SELECT 1; /* A */ 1 > CREATE MATERIALIZED VIEW v2 AS SELECT COUNT(a1.f1) AS c1, COUNT(a2.f1) AS c2 FROM v1 AS a1 FULL OUTER JOIN v1 AS a2 USING ({columns_using}); {inserts} > SELECT * FROM v2; /* B */ {self.n()} {self.n()} """ ) ]
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource | list[Action | MeasurementSource]
-
Expand source code Browse git
def benchmark(self) -> BenchmarkingSequence: columns_select = ", ".join( [f"a{i+1}.f1 AS f{i+1}" for i in range(0, floor(self.scale()))] ) columns_using = ", ".join([f"f{i+1}" for i in range(0, floor(self.scale()))]) inserts = "\n".join([f"> INSERT INTO ten VALUES ({i+1})" for i in range(0, 10)]) return [ Td( f""" > DROP MATERIALIZED VIEW IF EXISTS v2 CASCADE; > DROP MATERIALIZED VIEW IF EXISTS v1 CASCADE; > DROP TABLE IF EXISTS ten; > CREATE TABLE ten (f1 INTEGER); > CREATE MATERIALIZED VIEW v1 AS SELECT {columns_select} FROM {self.join()} > SELECT 1; /* A */ 1 > CREATE MATERIALIZED VIEW v2 AS SELECT COUNT(a1.f1) AS c1, COUNT(a2.f1) AS c2 FROM v1 AS a1 FULL OUTER JOIN v1 AS a2 USING ({columns_using}); {inserts} > SELECT * FROM v2; /* B */ {self.n()} {self.n()} """ ) ]
Inherited members
class GroupBy (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark scenarios around individual dataflow patterns/operators
Expand source code Browse git
class GroupBy(Dataflow): def init(self) -> list[Action]: return [ self.view_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()} > SELECT COUNT(*) = {self.n()} FROM v1 true """ ), ] def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1 /* A */ 1 > SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2) /* B */ {self.n()} 0 {self.n()-1} """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1 /* A */ 1 > SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2) /* B */ {self.n()} 0 {self.n()-1} """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.view_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()} > SELECT COUNT(*) = {self.n()} FROM v1 true """ ), ]
Inherited members
class GroupByMaintained (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark GroupBy as an indexed view, which renders a dataflow for incremental maintenance, in contrast with one-shot SELECT processing
Expand source code Browse git
class GroupByMaintained(Dataflow): """Benchmark GroupBy as an indexed view, which renders a dataflow for incremental maintenance, in contrast with one-shot SELECT processing""" def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()} > SELECT COUNT(*) = {self.n()} FROM v1 true """ ), ] def benchmark(self) -> MeasurementSource: return Td( f""" > DROP VIEW IF EXISTS v2; /* A */ > CREATE VIEW v2 AS SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2) > CREATE DEFAULT INDEX ON v2 > SELECT * FROM v2 /* B */ {self.n()} 0 {self.n()-1} """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > DROP VIEW IF EXISTS v2; /* A */ > CREATE VIEW v2 AS SELECT COUNT(*), MIN(f1_min), MAX(f1_max) FROM (SELECT f2, MIN(f1) AS f1_min, MAX(f1) AS f1_max FROM v1 GROUP BY f2) > CREATE DEFAULT INDEX ON v2 > SELECT * FROM v2 /* B */ {self.n()} 0 {self.n()-1} """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1, {self.unique_values()} AS f2 FROM {self.join()} > SELECT COUNT(*) = {self.n()} FROM v1 true """ ), ]
Inherited members
class HydrateIndex (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes for an index to hydrate when a cluster comes online.
Expand source code Browse git
class HydrateIndex(Scenario): """Measure the time it takes for an index to hydrate when a cluster comes online.""" def init(self) -> list[Action]: return [ self.table_ten(), TdAction( """ > CREATE CLUSTER idx_cluster SIZE '16', REPLICATION FACTOR 1 """ ), ] def benchmark(self) -> MeasurementSource: sql = f""" > DROP TABLE IF EXISTS t1 CASCADE > CREATE TABLE t1 (f1 INTEGER, f2 INTEGER) > ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 0) > CREATE INDEX i1 IN CLUSTER idx_cluster ON t1(f1) > INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()} > UPDATE t1 SET f1 = f1 + 100000 > UPDATE t1 SET f1 = f1 + 1000000 > UPDATE t1 SET f1 = f1 + 10000000 > UPDATE t1 SET f1 = f1 + 100000000 > UPDATE t1 SET f1 = f1 + 1000000000 > SELECT 1 /* A */ 1 > ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 1) > SET CLUSTER = idx_cluster ? EXPLAIN SELECT COUNT(*) FROM t1 Explained Query: Return // {{ arity: 1 }} Union // {{ arity: 1 }} Get l0 // {{ arity: 1 }} Map (0) // {{ arity: 1 }} Union // {{ arity: 0 }} Negate // {{ arity: 0 }} Project () // {{ arity: 0 }} Get l0 // {{ arity: 1 }} Constant // {{ arity: 0 }} - () With cte l0 = Reduce aggregates=[count(*)] // {{ arity: 1 }} Project () // {{ arity: 0 }} ReadIndex on=t1 i1=[*** full scan ***] // {{ arity: 2 }} Used Indexes: - materialize.public.i1 (*** full scan ***) Target cluster: idx_cluster > SELECT COUNT(*) FROM t1 /* B */ {self._n} > SET CLUSTER = default """ if self._mz_version < MzVersion.parse_mz("v0.83.0-dev"): sql = remove_arity_information_from_explain(sql) if self._mz_version < MzVersion.parse_mz("v0.96.0-dev"): sql = remove_target_cluster_from_explain(sql) return Td(sql)
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: sql = f""" > DROP TABLE IF EXISTS t1 CASCADE > CREATE TABLE t1 (f1 INTEGER, f2 INTEGER) > ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 0) > CREATE INDEX i1 IN CLUSTER idx_cluster ON t1(f1) > INSERT INTO t1 (f1) SELECT {self.unique_values()} FROM {self.join()} > UPDATE t1 SET f1 = f1 + 100000 > UPDATE t1 SET f1 = f1 + 1000000 > UPDATE t1 SET f1 = f1 + 10000000 > UPDATE t1 SET f1 = f1 + 100000000 > UPDATE t1 SET f1 = f1 + 1000000000 > SELECT 1 /* A */ 1 > ALTER CLUSTER idx_cluster SET (REPLICATION FACTOR 1) > SET CLUSTER = idx_cluster ? EXPLAIN SELECT COUNT(*) FROM t1 Explained Query: Return // {{ arity: 1 }} Union // {{ arity: 1 }} Get l0 // {{ arity: 1 }} Map (0) // {{ arity: 1 }} Union // {{ arity: 0 }} Negate // {{ arity: 0 }} Project () // {{ arity: 0 }} Get l0 // {{ arity: 1 }} Constant // {{ arity: 0 }} - () With cte l0 = Reduce aggregates=[count(*)] // {{ arity: 1 }} Project () // {{ arity: 0 }} ReadIndex on=t1 i1=[*** full scan ***] // {{ arity: 2 }} Used Indexes: - materialize.public.i1 (*** full scan ***) Target cluster: idx_cluster > SELECT COUNT(*) FROM t1 /* B */ {self._n} > SET CLUSTER = default """ if self._mz_version < MzVersion.parse_mz("v0.83.0-dev"): sql = remove_arity_information_from_explain(sql) if self._mz_version < MzVersion.parse_mz("v0.96.0-dev"): sql = remove_target_cluster_from_explain(sql) return Td(sql)
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.table_ten(), TdAction( """ > CREATE CLUSTER idx_cluster SIZE '16', REPLICATION FACTOR 1 """ ), ]
Inherited members
class Insert (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes for an INSERT statement to return.
Expand source code Browse git
class Insert(DML): """Measure the time it takes for an INSERT statement to return.""" def init(self) -> Action: return self.table_ten() def benchmark(self) -> MeasurementSource: return Td( f""" > DROP TABLE IF EXISTS t1; > CREATE TABLE t1 (f1 INTEGER) /* A */ > INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()} /* B */ """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > DROP TABLE IF EXISTS t1; > CREATE TABLE t1 (f1 INTEGER) /* A */ > INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()} /* B */ """ )
def init(self) ‑> Action
-
Expand source code Browse git
def init(self) -> Action: return self.table_ten()
Inherited members
class InsertAndSelect (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes for an INSERT statement to return AND for a follow-up SELECT to return data, that is, for the dataflow to be completely caught up.
Expand source code Browse git
class InsertAndSelect(DML): """Measure the time it takes for an INSERT statement to return AND for a follow-up SELECT to return data, that is, for the dataflow to be completely caught up. """ def init(self) -> Action: return self.table_ten() def benchmark(self) -> MeasurementSource: return Td( f""" > DROP TABLE IF EXISTS t1; > CREATE TABLE t1 (f1 INTEGER) /* A */ > INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}; > SELECT 1 FROM t1 WHERE f1 = 1 /* B */ 1 """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > DROP TABLE IF EXISTS t1; > CREATE TABLE t1 (f1 INTEGER) /* A */ > INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()}; > SELECT 1 FROM t1 WHERE f1 = 1 /* B */ 1 """ )
def init(self) ‑> Action
-
Expand source code Browse git
def init(self) -> Action: return self.table_ten()
Inherited members
class InsertBatch (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes for a batch of INSERT statements to return.
Expand source code Browse git
class InsertBatch(DML): """Measure the time it takes for a batch of INSERT statements to return.""" SCALE = 4 def benchmark(self) -> MeasurementSource: inserts = "\n".join( f"> INSERT INTO t1 VALUES ({i});" for i in range(0, self.n()) ) return Td( f""" > DROP TABLE IF EXISTS t1; > CREATE TABLE t1 (f1 INTEGER) /* A */ > SET auto_route_introspection_queries TO false > BEGIN {inserts} > COMMIT /* B */ """ )
Ancestors
Class variables
var SCALE : float
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: inserts = "\n".join( f"> INSERT INTO t1 VALUES ({i});" for i in range(0, self.n()) ) return Td( f""" > DROP TABLE IF EXISTS t1; > CREATE TABLE t1 (f1 INTEGER) /* A */ > SET auto_route_introspection_queries TO false > BEGIN {inserts} > COMMIT /* B */ """ )
Inherited members
class InsertMultiRow (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes for a single multi-row INSERT statement to return.
Expand source code Browse git
class InsertMultiRow(DML): """Measure the time it takes for a single multi-row INSERT statement to return.""" SCALE = 5 def benchmark(self) -> MeasurementSource: values = ", ".join(f"({i})" for i in range(0, self.n())) return Td( f""" > DROP TABLE IF EXISTS t1; > CREATE TABLE t1 (f1 INTEGER) /* A */ > INSERT INTO t1 VALUES {values} /* B */ """ )
Ancestors
Class variables
var SCALE : float
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: values = ", ".join(f"({i})" for i in range(0, self.n())) return Td( f""" > DROP TABLE IF EXISTS t1; > CREATE TABLE t1 (f1 INTEGER) /* A */ > INSERT INTO t1 VALUES {values} /* B */ """ )
Inherited members
class Kafka (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Expand source code Browse git
class Kafka(Scenario): pass
Ancestors
Subclasses
Inherited members
class KafkaEnvelopeNoneBytes (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Expand source code Browse git
class KafkaEnvelopeNoneBytes(Kafka): def shared(self) -> Action: data = "a" * 512 return TdAction( f""" $ kafka-create-topic topic=kafka-envelope-none-bytes $ kafka-ingest format=bytes topic=kafka-envelope-none-bytes repeat={self.n()} {data} """ ) def benchmark(self) -> MeasurementSource: return Td( f""" > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE > DROP CLUSTER IF EXISTS source_cluster CASCADE >[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-envelope-none-bytes-${{testdrive.seed}}') FORMAT BYTES ENVELOPE NONE /* A */ > SELECT COUNT(*) = {self.n()} FROM s1 /* B */ true """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE > DROP CLUSTER IF EXISTS source_cluster CASCADE >[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-envelope-none-bytes-${{testdrive.seed}}') FORMAT BYTES ENVELOPE NONE /* A */ > SELECT COUNT(*) = {self.n()} FROM s1 /* B */ true """ )
-
Expand source code Browse git
def shared(self) -> Action: data = "a" * 512 return TdAction( f""" $ kafka-create-topic topic=kafka-envelope-none-bytes $ kafka-ingest format=bytes topic=kafka-envelope-none-bytes repeat={self.n()} {data} """ )
Inherited members
class KafkaEnvelopeNoneBytesScalability (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Run the same scenario across different scales. Do not materialize the entire source but rather just a non-memory-consuming view on top of it.
Expand source code Browse git
@parameterized_class( [{"SCALE": i} for i in [5, 6, 7, 8, 9]], class_name_func=Scenario.name_with_scale ) class KafkaEnvelopeNoneBytesScalability(ScenarioBig): """Run the same scenario across different scales. Do not materialize the entire source but rather just a non-memory-consuming view on top of it. """ def shared(self) -> list[Action]: return [ TdAction( """ $ kafka-create-topic topic=kafka-scalability partitions=8 """ ), Kgen( topic="kafka-scalability", args=[ "--keys=sequential", f"--num-records={self.n()}", "--values=bytes", "--max-message-size=100", "--min-message-size=100", ], ), ] def benchmark(self) -> MeasurementSource: return Td( f""" > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE > DROP CLUSTER IF EXISTS source_cluster CASCADE >[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-scalability-${{testdrive.seed}}') KEY FORMAT BYTES VALUE FORMAT BYTES ENVELOPE NONE /* A */ > CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) AS c FROM s1; > SELECT c = {self.n()} FROM v1 /* B */ true """ )
Ancestors
Subclasses
- materialize.feature_benchmark.scenarios.benchmark_main.KafkaEnvelopeNoneBytesScalability_scale_5
- materialize.feature_benchmark.scenarios.benchmark_main.KafkaEnvelopeNoneBytesScalability_scale_6
- materialize.feature_benchmark.scenarios.benchmark_main.KafkaEnvelopeNoneBytesScalability_scale_7
- materialize.feature_benchmark.scenarios.benchmark_main.KafkaEnvelopeNoneBytesScalability_scale_8
- materialize.feature_benchmark.scenarios.benchmark_main.KafkaEnvelopeNoneBytesScalability_scale_9
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE > DROP CLUSTER IF EXISTS source_cluster CASCADE >[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-scalability-${{testdrive.seed}}') KEY FORMAT BYTES VALUE FORMAT BYTES ENVELOPE NONE /* A */ > CREATE MATERIALIZED VIEW v1 AS SELECT COUNT(*) AS c FROM s1; > SELECT c = {self.n()} FROM v1 /* B */ true """ )
-
Expand source code Browse git
def shared(self) -> list[Action]: return [ TdAction( """ $ kafka-create-topic topic=kafka-scalability partitions=8 """ ), Kgen( topic="kafka-scalability", args=[ "--keys=sequential", f"--num-records={self.n()}", "--values=bytes", "--max-message-size=100", "--min-message-size=100", ], ), ]
Inherited members
class KafkaRestart (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
This scenario dates from the pre-persistence era where the entire topic was re-ingested from scratch. With presistence however, no reingestion takes place and the scenario exhibits extreme variability. Instead of re-ingestion, we are measuring mostly the speed of COUNT(*), further obscured by the one second timestamp granularity
Expand source code Browse git
class KafkaRestart(ScenarioDisabled): """This scenario dates from the pre-persistence era where the entire topic was re-ingested from scratch. With presistence however, no reingestion takes place and the scenario exhibits extreme variability. Instead of re-ingestion, we are measuring mostly the speed of COUNT(*), further obscured by the one second timestamp granularity """ def shared(self) -> Action: return TdAction( self.keyschema() + self.schema() + f""" $ kafka-create-topic topic=kafka-recovery partitions=8 $ kafka-ingest format=avro topic=kafka-recovery key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()} {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }} """ ) def init(self) -> Action: return TdAction( f""" > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE > DROP CONNECTION IF EXISTS s1_csr_conn CASCADE > DROP CLUSTER IF EXISTS source_cluster CASCADE >[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CONNECTION IF NOT EXISTS s1_csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}'); > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn ENVELOPE UPSERT; # Make sure we are fully caught up before continuing > SELECT COUNT(*) FROM s1; {self.n()} # Give time for any background tasks (e.g. compaction) to settle down > SELECT mz_unsafe.mz_sleep(10) <null> """ ) def benchmark(self) -> BenchmarkingSequence: return [ Lambda(lambda e: e.RestartMz()), Td( f""" > SELECT COUNT(*) /* {self.n()} */ FROM s1; /* B */ {self.n()} """ ), ]
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource | list[Action | MeasurementSource]
-
Expand source code Browse git
def benchmark(self) -> BenchmarkingSequence: return [ Lambda(lambda e: e.RestartMz()), Td( f""" > SELECT COUNT(*) /* {self.n()} */ FROM s1; /* B */ {self.n()} """ ), ]
def init(self) ‑> Action
-
Expand source code Browse git
def init(self) -> Action: return TdAction( f""" > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE > DROP CONNECTION IF EXISTS s1_csr_conn CASCADE > DROP CLUSTER IF EXISTS source_cluster CASCADE >[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CONNECTION IF NOT EXISTS s1_csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}'); > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn ENVELOPE UPSERT; # Make sure we are fully caught up before continuing > SELECT COUNT(*) FROM s1; {self.n()} # Give time for any background tasks (e.g. compaction) to settle down > SELECT mz_unsafe.mz_sleep(10) <null> """ )
-
Expand source code Browse git
def shared(self) -> Action: return TdAction( self.keyschema() + self.schema() + f""" $ kafka-create-topic topic=kafka-recovery partitions=8 $ kafka-ingest format=avro topic=kafka-recovery key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()} {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }} """ )
Inherited members
class KafkaRestartBig (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Ingest 100M records without constructing a dataflow that would keep all of them in memory. For the purpose, we emit a bunch of "EOF" records after the primary ingestion is complete and consider that the source has caught up when all the EOF records have been seen.
Expand source code Browse git
class KafkaRestartBig(ScenarioBig): """Ingest 100M records without constructing a dataflow that would keep all of them in memory. For the purpose, we emit a bunch of "EOF" records after the primary ingestion is complete and consider that the source has caught up when all the EOF records have been seen. """ SCALE = 8 def shared(self) -> list[Action]: return [ TdAction("$ kafka-create-topic topic=kafka-recovery-big partitions=8"), # Ingest 10 ** SCALE records Kgen( topic="kafka-recovery-big", args=[ "--keys=random", f"--num-records={self.n()}", "--values=bytes", "--max-message-size=32", "--min-message-size=32", "--key-min=256", f"--key-max={256+(self.n()**2)}", ], ), # Add 256 EOF markers with key values <= 256. # This high number is chosen as to guarantee that there will be an EOF marker # in each partition, even if the number of partitions is increased in the future. Kgen( topic="kafka-recovery-big", args=[ "--keys=sequential", "--num-records=256", "--values=bytes", "--min-message-size=32", "--max-message-size=32", ], ), ] def init(self) -> Action: return TdAction( f""" >[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > DROP CLUSTER IF EXISTS source_cluster CASCADE > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-big-${{testdrive.seed}}') KEY FORMAT BYTES VALUE FORMAT BYTES ENVELOPE UPSERT; # Confirm that all the EOF markers generated above have been processed > CREATE MATERIALIZED VIEW s1_is_complete AS SELECT COUNT(*) = 256 FROM s1 WHERE key <= '\\x00000000000000ff' > SELECT * FROM s1_is_complete; true """ ) def benchmark(self) -> BenchmarkingSequence: return [ Lambda(lambda e: e.RestartMz()), Td( """ > SELECT * FROM s1_is_complete /* B */ true """ ), ]
Ancestors
Class variables
var SCALE : float
Methods
def benchmark(self) ‑> MeasurementSource | list[Action | MeasurementSource]
-
Expand source code Browse git
def benchmark(self) -> BenchmarkingSequence: return [ Lambda(lambda e: e.RestartMz()), Td( """ > SELECT * FROM s1_is_complete /* B */ true """ ), ]
def init(self) ‑> Action
-
Expand source code Browse git
def init(self) -> Action: return TdAction( f""" >[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > DROP CLUSTER IF EXISTS source_cluster CASCADE > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-recovery-big-${{testdrive.seed}}') KEY FORMAT BYTES VALUE FORMAT BYTES ENVELOPE UPSERT; # Confirm that all the EOF markers generated above have been processed > CREATE MATERIALIZED VIEW s1_is_complete AS SELECT COUNT(*) = 256 FROM s1 WHERE key <= '\\x00000000000000ff' > SELECT * FROM s1_is_complete; true """ )
-
Expand source code Browse git
def shared(self) -> list[Action]: return [ TdAction("$ kafka-create-topic topic=kafka-recovery-big partitions=8"), # Ingest 10 ** SCALE records Kgen( topic="kafka-recovery-big", args=[ "--keys=random", f"--num-records={self.n()}", "--values=bytes", "--max-message-size=32", "--min-message-size=32", "--key-min=256", f"--key-max={256+(self.n()**2)}", ], ), # Add 256 EOF markers with key values <= 256. # This high number is chosen as to guarantee that there will be an EOF marker # in each partition, even if the number of partitions is increased in the future. Kgen( topic="kafka-recovery-big", args=[ "--keys=sequential", "--num-records=256", "--values=bytes", "--min-message-size=32", "--max-message-size=32", ], ), ]
Inherited members
class KafkaUpsert (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Expand source code Browse git
class KafkaUpsert(Kafka): def shared(self) -> Action: return TdAction( self.keyschema() + self.schema() + f""" $ kafka-create-topic topic=kafka-upsert $ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()} {{"f1": 1}} {{"f2": ${{kafka-ingest.iteration}} }} $ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}} {{"f1": 2}} {{"f2": 2}} """ ) def benchmark(self) -> MeasurementSource: return Td( f""" > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE > DROP CLUSTER IF EXISTS source_cluster CASCADE >[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY ( URL '${{testdrive.schema-registry-url}}' ); > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-upsert-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT /* A */ > SELECT f1 FROM s1 /* B */ 1 2 """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE > DROP CLUSTER IF EXISTS source_cluster CASCADE >[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY ( URL '${{testdrive.schema-registry-url}}' ); > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-kafka-upsert-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE UPSERT /* A */ > SELECT f1 FROM s1 /* B */ 1 2 """ )
-
Expand source code Browse git
def shared(self) -> Action: return TdAction( self.keyschema() + self.schema() + f""" $ kafka-create-topic topic=kafka-upsert $ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()} {{"f1": 1}} {{"f2": ${{kafka-ingest.iteration}} }} $ kafka-ingest format=avro topic=kafka-upsert key-format=avro key-schema=${{keyschema}} schema=${{schema}} {{"f1": 2}} {{"f2": 2}} """ )
Inherited members
class KafkaUpsertUnique (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Expand source code Browse git
class KafkaUpsertUnique(Kafka): def shared(self) -> Action: return TdAction( self.keyschema() + self.schema() + f""" $ kafka-create-topic topic=upsert-unique partitions=16 $ kafka-ingest format=avro topic=upsert-unique key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()} {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }} """ ) def benchmark(self) -> MeasurementSource: return Td( f""" > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE > DROP CONNECTION IF EXISTS s1_csr_conn CASCADE > DROP CLUSTER IF EXISTS source_cluster CASCADE >[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CONNECTION IF NOT EXISTS s1_csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}'); /* A */ > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-upsert-unique-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn ENVELOPE UPSERT > SELECT COUNT(*) FROM s1; /* B */ {self.n()} """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > DROP CONNECTION IF EXISTS s1_kafka_conn CASCADE > DROP CONNECTION IF EXISTS s1_csr_conn CASCADE > DROP CLUSTER IF EXISTS source_cluster CASCADE >[version<7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CONNECTION IF NOT EXISTS s1_csr_conn TO CONFLUENT SCHEMA REGISTRY (URL '${{testdrive.schema-registry-url}}'); /* A */ > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-upsert-unique-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn ENVELOPE UPSERT > SELECT COUNT(*) FROM s1; /* B */ {self.n()} """ )
-
Expand source code Browse git
def shared(self) -> Action: return TdAction( self.keyschema() + self.schema() + f""" $ kafka-create-topic topic=upsert-unique partitions=16 $ kafka-ingest format=avro topic=upsert-unique key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()} {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }} """ )
Inherited members
class MFPPushdown (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Test MFP pushdown – WHERE clause with a suitable condition and no index defined.
Expand source code Browse git
class MFPPushdown(Scenario): """Test MFP pushdown -- WHERE clause with a suitable condition and no index defined.""" SCALE = 7 FIXED_SCALE = True # OOM with 10**8 = 100M records def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()}); > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ] def benchmark(self) -> MeasurementSource: return Td( """ > /* A */ SELECT 1; 1 > /* B */ SELECT * FROM v1 WHERE f2 < 0; """ )
Ancestors
Class variables
var FIXED_SCALE : bool
var SCALE : float
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( """ > /* A */ SELECT 1; 1 > /* B */ SELECT * FROM v1 WHERE f2 < 0; """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 (f1, f2) AS SELECT generate_series AS f1, 1 AS f2 FROM generate_series(1, {self.n()}); > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ]
Inherited members
class ManySmallInserts (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes for several small INSERT statements to return.
Expand source code Browse git
class ManySmallInserts(DML): """Measure the time it takes for several small INSERT statements to return.""" def init(self) -> Action: return self.table_ten() def benchmark(self) -> MeasurementSource: random.seed(self.seed()) statements = [] for _ in range(0, 10000): statements.append(f"> INSERT INTO t1 VALUES ({random.randint(0, 100000)})") insert_statements_str = "\n".join(statements) return Td( f""" > DROP TABLE IF EXISTS t1; > CREATE TABLE t1 (f1 INTEGER) /* A */ {insert_statements_str} /* B */ """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: random.seed(self.seed()) statements = [] for _ in range(0, 10000): statements.append(f"> INSERT INTO t1 VALUES ({random.randint(0, 100000)})") insert_statements_str = "\n".join(statements) return Td( f""" > DROP TABLE IF EXISTS t1; > CREATE TABLE t1 (f1 INTEGER) /* A */ {insert_statements_str} /* B */ """ )
def init(self) ‑> Action
-
Expand source code Browse git
def init(self) -> Action: return self.table_ten()
Inherited members
class ManySmallUpdates (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes for several small UPDATE statements to return to client
Expand source code Browse git
class ManySmallUpdates(DML): """Measure the time it takes for several small UPDATE statements to return to client""" def init(self) -> list[Action]: return [ self.table_ten(), TdAction( """ > CREATE TABLE t1 (f1 INT, f2 INT); > CREATE DEFAULT INDEX ON t1; > INSERT INTO t1 SELECT generate_series(1, 10); """ ), ] def benchmark(self) -> MeasurementSource: random.seed(self.seed()) statements = [] for _ in range(0, 10000): statements.append( f"> UPDATE t1 SET f1 = {random.randint(0, 100000)}, f2 = {random.randint(0, 100000)} WHERE f1 % 10 = {random.randint(0, 10)}" ) update_statements_str = "\n".join(statements) return Td( f""" > SELECT 1 /* A */ 1 {update_statements_str} /* B */ """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: random.seed(self.seed()) statements = [] for _ in range(0, 10000): statements.append( f"> UPDATE t1 SET f1 = {random.randint(0, 100000)}, f2 = {random.randint(0, 100000)} WHERE f1 % 10 = {random.randint(0, 10)}" ) update_statements_str = "\n".join(statements) return Td( f""" > SELECT 1 /* A */ 1 {update_statements_str} /* B */ """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.table_ten(), TdAction( """ > CREATE TABLE t1 (f1 INT, f2 INT); > CREATE DEFAULT INDEX ON t1; > INSERT INTO t1 SELECT generate_series(1, 10); """ ), ]
Inherited members
class MinMax (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark scenarios around individual dataflow patterns/operators
Expand source code Browse git
class MinMax(Dataflow): def init(self) -> list[Action]: return [ self.view_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}; > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ] def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1 /* A */ 1 > SELECT MIN(f1), MAX(f1) AS f1 FROM v1 /* B */ 0 {self.n()-1} """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1 /* A */ 1 > SELECT MIN(f1), MAX(f1) AS f1 FROM v1 /* B */ 0 {self.n()-1} """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.view_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}; > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ]
Inherited members
class MinMaxMaintained (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark MinMax as an indexed view, which renders a dataflow for incremental maintenance, in contrast with one-shot SELECT processing
Expand source code Browse git
class MinMaxMaintained(Dataflow): """Benchmark MinMax as an indexed view, which renders a dataflow for incremental maintenance, in contrast with one-shot SELECT processing""" def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}; > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ] def benchmark(self) -> MeasurementSource: return Td( f""" > DROP VIEW IF EXISTS v2 /* A */ > CREATE VIEW v2 AS SELECT MIN(f1), MAX(f1) AS f1 FROM v1 > CREATE DEFAULT INDEX ON v2 > SELECT * FROM v2 /* B */ 0 {self.n()-1} """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > DROP VIEW IF EXISTS v2 /* A */ > CREATE VIEW v2 AS SELECT MIN(f1), MAX(f1) AS f1 FROM v1 > CREATE DEFAULT INDEX ON v2 > SELECT * FROM v2 /* B */ 0 {self.n()-1} """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}; > SELECT COUNT(*) = {self.n()} FROM v1; true """ ), ]
Inherited members
class MySqlCdc (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Expand source code Browse git
class MySqlCdc(Scenario): pass
Ancestors
Subclasses
Inherited members
class MySqlInitialLoad (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes to read 1M existing records from MySQL when creating a materialized source
Expand source code Browse git
class MySqlInitialLoad(MySqlCdc): """Measure the time it takes to read 1M existing records from MySQL when creating a materialized source""" FIXED_SCALE = True # TODO: Remove when #25323 is fixed def shared(self) -> Action: return TdAction( f""" $ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}} $ mysql-execute name=mysql DROP DATABASE IF EXISTS public; CREATE DATABASE public; USE public; SET @i:=0; CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT); INSERT INTO pk_table SELECT @i:=@i+1, @i*@i FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.n()}; """ ) def before(self) -> Action: return TdAction( """ > DROP SOURCE IF EXISTS mz_source_mysqlcdc CASCADE; > DROP CLUSTER IF EXISTS source_cluster CASCADE """ ) def benchmark(self) -> MeasurementSource: return Td( f""" > CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}' > CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL ( HOST mysql, USER root, PASSWORD SECRET mysqlpass ) > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE mz_source_mysqlcdc IN CLUSTER source_cluster FROM MYSQL CONNECTION mysql_conn FOR TABLES (public.pk_table) /* A */ > SELECT count(*) FROM pk_table /* B */ {self.n()} """ )
Ancestors
Class variables
var FIXED_SCALE : bool
Methods
def before(self) ‑> Action
-
Expand source code Browse git
def before(self) -> Action: return TdAction( """ > DROP SOURCE IF EXISTS mz_source_mysqlcdc CASCADE; > DROP CLUSTER IF EXISTS source_cluster CASCADE """ )
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}' > CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL ( HOST mysql, USER root, PASSWORD SECRET mysqlpass ) > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE mz_source_mysqlcdc IN CLUSTER source_cluster FROM MYSQL CONNECTION mysql_conn FOR TABLES (public.pk_table) /* A */ > SELECT count(*) FROM pk_table /* B */ {self.n()} """ )
-
Expand source code Browse git
def shared(self) -> Action: return TdAction( f""" $ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}} $ mysql-execute name=mysql DROP DATABASE IF EXISTS public; CREATE DATABASE public; USE public; SET @i:=0; CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT); INSERT INTO pk_table SELECT @i:=@i+1, @i*@i FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {self.n()}; """ )
Inherited members
class MySqlStreaming (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes to ingest records from MySQL post-snapshot
Expand source code Browse git
class MySqlStreaming(MySqlCdc): """Measure the time it takes to ingest records from MySQL post-snapshot""" SCALE = 5 @classmethod def can_run(cls, version: MzVersion) -> bool: return version >= MzVersion.parse_mz("v0.88.0-dev") def shared(self) -> Action: return TdAction( """ $ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password} $ mysql-execute name=mysql DROP DATABASE IF EXISTS public; CREATE DATABASE public; USE public; """ ) def before(self) -> Action: return TdAction( f""" > DROP SOURCE IF EXISTS s1 CASCADE; > DROP CLUSTER IF EXISTS source_cluster CASCADE; $ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}} $ mysql-execute name=mysql DROP DATABASE IF EXISTS public; CREATE DATABASE public; USE public; DROP TABLE IF EXISTS t1; CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT); > CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}' > CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL ( HOST mysql, USER root, PASSWORD SECRET mysqlpass ) > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM MYSQL CONNECTION mysql_conn FOR TABLES (public.t1) """ ) def benchmark(self) -> MeasurementSource: insertions = "\n".join( [ dedent( f""" SET @i:=0; INSERT INTO t1 (f2) SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {round(self.n()/1000)}; COMMIT; """ ) for i in range(0, 1000) ] ) return Td( f""" > SELECT 1; /* A */ 1 $ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}} $ mysql-execute name=mysql USE public; {insertions} > SELECT count(*) FROM t1 /* B */ {self.n()} """ )
Ancestors
Class variables
var SCALE : float
Static methods
def can_run(version: MzVersion) ‑> bool
-
Expand source code Browse git
@classmethod def can_run(cls, version: MzVersion) -> bool: return version >= MzVersion.parse_mz("v0.88.0-dev")
Methods
def before(self) ‑> Action
-
Expand source code Browse git
def before(self) -> Action: return TdAction( f""" > DROP SOURCE IF EXISTS s1 CASCADE; > DROP CLUSTER IF EXISTS source_cluster CASCADE; $ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}} $ mysql-execute name=mysql DROP DATABASE IF EXISTS public; CREATE DATABASE public; USE public; DROP TABLE IF EXISTS t1; CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT); > CREATE SECRET IF NOT EXISTS mysqlpass AS '${{arg.mysql-root-password}}' > CREATE CONNECTION IF NOT EXISTS mysql_conn TO MYSQL ( HOST mysql, USER root, PASSWORD SECRET mysqlpass ) > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM MYSQL CONNECTION mysql_conn FOR TABLES (public.t1) """ )
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: insertions = "\n".join( [ dedent( f""" SET @i:=0; INSERT INTO t1 (f2) SELECT @i:=@i+1 FROM mysql.time_zone t1, mysql.time_zone t2 LIMIT {round(self.n()/1000)}; COMMIT; """ ) for i in range(0, 1000) ] ) return Td( f""" > SELECT 1; /* A */ 1 $ mysql-connect name=mysql url=mysql://root@mysql password=${{arg.mysql-root-password}} $ mysql-execute name=mysql USE public; {insertions} > SELECT count(*) FROM t1 /* B */ {self.n()} """ )
-
Expand source code Browse git
def shared(self) -> Action: return TdAction( """ $ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-password} $ mysql-execute name=mysql DROP DATABASE IF EXISTS public; CREATE DATABASE public; USE public; """ )
Inherited members
class OrderBy (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark ORDER BY as executed by the dataflow layer, in contrast with an ORDER BY executed using a Finish step in the coordinator
Expand source code Browse git
class OrderBy(Dataflow): """Benchmark ORDER BY as executed by the dataflow layer, in contrast with an ORDER BY executed using a Finish step in the coordinator""" def init(self) -> Action: # Just to spice things up a bit, we perform individual # inserts here so that the rows are assigned separate timestamps inserts = "\n\n".join(f"> INSERT INTO ten VALUES ({i})" for i in range(0, 10)) return TdAction( f""" > CREATE TABLE ten (f1 INTEGER); > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}; {inserts} > SELECT COUNT(*) = {self.n()} FROM v1; true """ ) def benchmark(self) -> MeasurementSource: # Explicit LIMIT is needed for the ORDER BY to not be optimized away return Td( f""" > DROP MATERIALIZED VIEW IF EXISTS v2 /* A */ > CREATE MATERIALIZED VIEW v2 AS SELECT * FROM v1 ORDER BY f1 LIMIT 999999999999 > SELECT COUNT(*) FROM v2 /* B */ {self.n()} """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: # Explicit LIMIT is needed for the ORDER BY to not be optimized away return Td( f""" > DROP MATERIALIZED VIEW IF EXISTS v2 /* A */ > CREATE MATERIALIZED VIEW v2 AS SELECT * FROM v1 ORDER BY f1 LIMIT 999999999999 > SELECT COUNT(*) FROM v2 /* B */ {self.n()} """ )
def init(self) ‑> Action
-
Expand source code Browse git
def init(self) -> Action: # Just to spice things up a bit, we perform individual # inserts here so that the rows are assigned separate timestamps inserts = "\n\n".join(f"> INSERT INTO ten VALUES ({i})" for i in range(0, 10)) return TdAction( f""" > CREATE TABLE ten (f1 INTEGER); > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} AS f1 FROM {self.join()}; {inserts} > SELECT COUNT(*) = {self.n()} FROM v1; true """ )
Inherited members
class PgCdc (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Expand source code Browse git
class PgCdc(Scenario): pass
Ancestors
Subclasses
Inherited members
class PgCdcInitialLoad (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes to read 1M existing records from Postgres when creating a materialized source
Expand source code Browse git
class PgCdcInitialLoad(PgCdc): """Measure the time it takes to read 1M existing records from Postgres when creating a materialized source""" def shared(self) -> Action: return TdAction( f""" $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER USER postgres WITH replication; DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public; DROP PUBLICATION IF EXISTS mz_source; CREATE PUBLICATION mz_source FOR ALL TABLES; CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT); INSERT INTO pk_table SELECT x, x*2 FROM generate_series(1, {self.n()}) as x; ALTER TABLE pk_table REPLICA IDENTITY FULL; """ ) def before(self) -> Action: return TdAction( """ > DROP SOURCE IF EXISTS mz_source_pgcdc CASCADE; > DROP CLUSTER IF EXISTS source_cluster CASCADE """ ) def benchmark(self) -> MeasurementSource: return Td( f""" > CREATE SECRET IF NOT EXISTS pgpass AS 'postgres' > CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES ( HOST postgres, DATABASE postgres, USER postgres, PASSWORD SECRET pgpass ) > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE mz_source_pgcdc IN CLUSTER source_cluster FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'mz_source') FOR TABLES ("pk_table") /* A */ > SELECT count(*) FROM pk_table /* B */ {self.n()} """ )
Ancestors
Methods
def before(self) ‑> Action
-
Expand source code Browse git
def before(self) -> Action: return TdAction( """ > DROP SOURCE IF EXISTS mz_source_pgcdc CASCADE; > DROP CLUSTER IF EXISTS source_cluster CASCADE """ )
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > CREATE SECRET IF NOT EXISTS pgpass AS 'postgres' > CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES ( HOST postgres, DATABASE postgres, USER postgres, PASSWORD SECRET pgpass ) > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE mz_source_pgcdc IN CLUSTER source_cluster FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'mz_source') FOR TABLES ("pk_table") /* A */ > SELECT count(*) FROM pk_table /* B */ {self.n()} """ )
-
Expand source code Browse git
def shared(self) -> Action: return TdAction( f""" $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER USER postgres WITH replication; DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public; DROP PUBLICATION IF EXISTS mz_source; CREATE PUBLICATION mz_source FOR ALL TABLES; CREATE TABLE pk_table (pk BIGINT PRIMARY KEY, f2 BIGINT); INSERT INTO pk_table SELECT x, x*2 FROM generate_series(1, {self.n()}) as x; ALTER TABLE pk_table REPLICA IDENTITY FULL; """ )
Inherited members
class PgCdcStreaming (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes to ingest records from Postgres post-snapshot
Expand source code Browse git
class PgCdcStreaming(PgCdc): """Measure the time it takes to ingest records from Postgres post-snapshot""" SCALE = 5 def shared(self) -> Action: return TdAction( """ $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER USER postgres WITH replication; DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public; DROP PUBLICATION IF EXISTS p1; CREATE PUBLICATION p1 FOR ALL TABLES; """ ) def before(self) -> Action: return TdAction( f""" > DROP SOURCE IF EXISTS s1 CASCADE; > DROP CLUSTER IF EXISTS source_cluster CASCADE; $ postgres-execute connection=postgres://postgres:postgres@postgres DROP TABLE IF EXISTS t1; CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT); ALTER TABLE t1 REPLICA IDENTITY FULL; > CREATE SECRET IF NOT EXISTS pgpass AS 'postgres' > CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES ( HOST postgres, DATABASE postgres, USER postgres, PASSWORD SECRET pgpass ) > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'p1') FOR TABLES ("t1") """ ) def benchmark(self) -> MeasurementSource: insertions = "\n".join( [ f"INSERT INTO t1 (f2) SELECT x FROM generate_series(1, {self.n()/1000}) as x;\nCOMMIT;" for i in range(0, 1000) ] ) return Td( f""" > SELECT 1; /* A */ 1 $ postgres-execute connection=postgres://postgres:postgres@postgres {insertions} > SELECT count(*) FROM t1 /* B */ {self.n()} """ )
Ancestors
Class variables
var SCALE : float
Methods
def before(self) ‑> Action
-
Expand source code Browse git
def before(self) -> Action: return TdAction( f""" > DROP SOURCE IF EXISTS s1 CASCADE; > DROP CLUSTER IF EXISTS source_cluster CASCADE; $ postgres-execute connection=postgres://postgres:postgres@postgres DROP TABLE IF EXISTS t1; CREATE TABLE t1 (pk SERIAL PRIMARY KEY, f2 BIGINT); ALTER TABLE t1 REPLICA IDENTITY FULL; > CREATE SECRET IF NOT EXISTS pgpass AS 'postgres' > CREATE CONNECTION IF NOT EXISTS pg_conn TO POSTGRES ( HOST postgres, DATABASE postgres, USER postgres, PASSWORD SECRET pgpass ) > CREATE CLUSTER source_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s1 IN CLUSTER source_cluster FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'p1') FOR TABLES ("t1") """ )
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: insertions = "\n".join( [ f"INSERT INTO t1 (f2) SELECT x FROM generate_series(1, {self.n()/1000}) as x;\nCOMMIT;" for i in range(0, 1000) ] ) return Td( f""" > SELECT 1; /* A */ 1 $ postgres-execute connection=postgres://postgres:postgres@postgres {insertions} > SELECT count(*) FROM t1 /* B */ {self.n()} """ )
-
Expand source code Browse git
def shared(self) -> Action: return TdAction( """ $ postgres-execute connection=postgres://postgres:postgres@postgres ALTER USER postgres WITH replication; DROP SCHEMA IF EXISTS public CASCADE; CREATE SCHEMA public; DROP PUBLICATION IF EXISTS p1; CREATE PUBLICATION p1 FOR ALL TABLES; """ )
Inherited members
class QueryLatency (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Feature benchmarks pertaining to the coordinator.
Expand source code Browse git
class QueryLatency(Coordinator): SCALE = 3 """Measure the time it takes to run SELECT 1 queries""" def benchmark(self) -> MeasurementSource: selects = "\n".join("> SELECT 1\n1\n" for i in range(0, self.n())) return Td( f""" > SET auto_route_introspection_queries TO false > BEGIN > SELECT 1; /* A */ 1 {selects} > SELECT 1; /* B */ 1 """ )
Ancestors
Class variables
var SCALE : float
-
Measure the time it takes to run SELECT 1 queries
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: selects = "\n".join("> SELECT 1\n1\n" for i in range(0, self.n())) return Td( f""" > SET auto_route_introspection_queries TO false > BEGIN > SELECT 1; /* A */ 1 {selects} > SELECT 1; /* B */ 1 """ )
Inherited members
class Retraction (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Benchmark the time it takes to process a very large retraction
Expand source code Browse git
class Retraction(Dataflow): """Benchmark the time it takes to process a very large retraction""" def before(self) -> Action: return TdAction( f""" > DROP TABLE IF EXISTS ten CASCADE; > CREATE TABLE ten (f1 INTEGER); > INSERT INTO ten VALUES (0),(1),(2),(3),(4),(5),(6),(7),(8),(9); > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()} > SELECT COUNT(*) = {self.n()} AS f1 FROM v1; true """ ) def benchmark(self) -> MeasurementSource: return Td( """ > SELECT 1 /* A */ 1 > DELETE FROM ten; > SELECT COUNT(*) FROM v1 /* B */ 0 """ )
Ancestors
Methods
def before(self) ‑> Action
-
Expand source code Browse git
def before(self) -> Action: return TdAction( f""" > DROP TABLE IF EXISTS ten CASCADE; > CREATE TABLE ten (f1 INTEGER); > INSERT INTO ten VALUES (0),(1),(2),(3),(4),(5),(6),(7),(8),(9); > CREATE MATERIALIZED VIEW v1 AS SELECT {self.unique_values()} FROM {self.join()} > SELECT COUNT(*) = {self.n()} AS f1 FROM v1; true """ )
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( """ > SELECT 1 /* A */ 1 > DELETE FROM ten; > SELECT COUNT(*) FROM v1 /* B */ 0 """ )
Inherited members
class Sink (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Expand source code Browse git
class Sink(Scenario): pass
Ancestors
Subclasses
Inherited members
class Startup (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Expand source code Browse git
class Startup(Scenario): pass
Ancestors
Subclasses
Inherited members
class StartupEmpty (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes to restart an empty Mz instance.
Expand source code Browse git
class StartupEmpty(Startup): """Measure the time it takes to restart an empty Mz instance.""" def benchmark(self) -> BenchmarkingSequence: return [ Lambda(lambda e: e.RestartMz()), Td( """ > SELECT 1; /* B */ 1 """ ), ]
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource | list[Action | MeasurementSource]
-
Expand source code Browse git
def benchmark(self) -> BenchmarkingSequence: return [ Lambda(lambda e: e.RestartMz()), Td( """ > SELECT 1; /* B */ 1 """ ), ]
Inherited members
class StartupLoaded (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes to restart a populated Mz instance and have all the dataflows be ready to return something
Expand source code Browse git
class StartupLoaded(Scenario): """Measure the time it takes to restart a populated Mz instance and have all the dataflows be ready to return something""" SCALE = 1.2 # 25 objects of each kind FIXED_SCALE = ( True # Can not scale to 100s of objects, so --size=+N will have no effect ) def shared(self) -> Action: return TdAction( self.schema() + """ $ kafka-create-topic topic=startup-time $ kafka-ingest format=avro topic=startup-time schema=${schema} repeat=1 {"f2": 1} """ ) def init(self) -> Action: create_tables = "\n".join( f"> CREATE TABLE t{i} (f1 INTEGER);\n> INSERT INTO t{i} DEFAULT VALUES;" for i in range(0, self.n()) ) create_sources = "\n".join( f""" > DROP CLUSTER IF EXISTS source{i}_cluster CASCADE; > CREATE CLUSTER source{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE source{i} IN CLUSTER source{i}_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-startup-time-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn ENVELOPE NONE """ for i in range(0, self.n()) ) join = " ".join( f"LEFT JOIN source{i} USING (f2)" for i in range(1, (ceil(self.scale()))) ) create_views = "\n".join( f"> CREATE MATERIALIZED VIEW v{i} AS SELECT * FROM source{i} AS s {join} LIMIT {i+1}" for i in range(0, self.n()) ) create_sinks = "\n".join( f""" > DROP CLUSTER IF EXISTS sink{i}_cluster; > CREATE CLUSTER sink{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SINK sink{i} IN CLUSTER sink{i}_cluster FROM source{i} INTO KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}') KEY (f2) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn ENVELOPE DEBEZIUM """ for i in range(0, self.n()) ) return TdAction( f""" $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}} $ postgres-execute connection=mz_system ALTER SYSTEM SET max_objects_per_schema = {self.n() * 10}; ALTER SYSTEM SET max_materialized_views = {self.n() * 2}; ALTER SYSTEM SET max_sources = {self.n() * 2}; ALTER SYSTEM SET max_sinks = {self.n() * 2}; ALTER SYSTEM SET max_tables = {self.n() * 2}; ALTER SYSTEM SET max_clusters = {self.n() * 6}; > DROP OWNED BY materialize CASCADE; >[version<7800] CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CONNECTION IF NOT EXISTS s1_csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}'; {create_tables} {create_sources} {create_views} {create_sinks} """ ) def benchmark(self) -> BenchmarkingSequence: check_tables = "\n".join( f"> SELECT COUNT(*) >= 0 FROM t{i}\ntrue" for i in range(0, self.n()) ) check_sources = "\n".join( f"> SELECT COUNT(*) > 0 FROM source{i}\ntrue" for i in range(0, self.n()) ) check_views = "\n".join( f"> SELECT COUNT(*) > 0 FROM v{i}\ntrue" for i in range(0, self.n()) ) return [ Lambda(lambda e: e.RestartMz()), Td( f""" {check_views} {check_sources} {check_tables} > SELECT 1; /* B */ 1 """ ), ]
Ancestors
Class variables
var FIXED_SCALE : bool
var SCALE : float
Methods
def benchmark(self) ‑> MeasurementSource | list[Action | MeasurementSource]
-
Expand source code Browse git
def benchmark(self) -> BenchmarkingSequence: check_tables = "\n".join( f"> SELECT COUNT(*) >= 0 FROM t{i}\ntrue" for i in range(0, self.n()) ) check_sources = "\n".join( f"> SELECT COUNT(*) > 0 FROM source{i}\ntrue" for i in range(0, self.n()) ) check_views = "\n".join( f"> SELECT COUNT(*) > 0 FROM v{i}\ntrue" for i in range(0, self.n()) ) return [ Lambda(lambda e: e.RestartMz()), Td( f""" {check_views} {check_sources} {check_tables} > SELECT 1; /* B */ 1 """ ), ]
def init(self) ‑> Action
-
Expand source code Browse git
def init(self) -> Action: create_tables = "\n".join( f"> CREATE TABLE t{i} (f1 INTEGER);\n> INSERT INTO t{i} DEFAULT VALUES;" for i in range(0, self.n()) ) create_sources = "\n".join( f""" > DROP CLUSTER IF EXISTS source{i}_cluster CASCADE; > CREATE CLUSTER source{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE source{i} IN CLUSTER source{i}_cluster FROM KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-startup-time-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn ENVELOPE NONE """ for i in range(0, self.n()) ) join = " ".join( f"LEFT JOIN source{i} USING (f2)" for i in range(1, (ceil(self.scale()))) ) create_views = "\n".join( f"> CREATE MATERIALIZED VIEW v{i} AS SELECT * FROM source{i} AS s {join} LIMIT {i+1}" for i in range(0, self.n()) ) create_sinks = "\n".join( f""" > DROP CLUSTER IF EXISTS sink{i}_cluster; > CREATE CLUSTER sink{i}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SINK sink{i} IN CLUSTER sink{i}_cluster FROM source{i} INTO KAFKA CONNECTION s1_kafka_conn (TOPIC 'testdrive-sink-output-${{testdrive.seed}}') KEY (f2) FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION s1_csr_conn ENVELOPE DEBEZIUM """ for i in range(0, self.n()) ) return TdAction( f""" $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${{testdrive.materialize-internal-sql-addr}} $ postgres-execute connection=mz_system ALTER SYSTEM SET max_objects_per_schema = {self.n() * 10}; ALTER SYSTEM SET max_materialized_views = {self.n() * 2}; ALTER SYSTEM SET max_sources = {self.n() * 2}; ALTER SYSTEM SET max_sinks = {self.n() * 2}; ALTER SYSTEM SET max_tables = {self.n() * 2}; ALTER SYSTEM SET max_clusters = {self.n() * 6}; > DROP OWNED BY materialize CASCADE; >[version<7800] CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION IF NOT EXISTS s1_kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CONNECTION IF NOT EXISTS s1_csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}'; {create_tables} {create_sources} {create_views} {create_sinks} """ )
-
Expand source code Browse git
def shared(self) -> Action: return TdAction( self.schema() + """ $ kafka-create-topic topic=startup-time $ kafka-ingest format=avro topic=startup-time schema=${schema} repeat=1 {"f2": 1} """ )
Inherited members
class SwapSchema (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Expand source code Browse git
class SwapSchema(Scenario): SCALE = 2 FIXED_SCALE = True def init(self) -> list[Action]: blue_views_on_table = "\n".join( f"> CREATE VIEW blue.v{i} AS SELECT * FROM blue.t1;" for i in range(0, self.n()) ) green_views_on_table = "\n".join( f"> CREATE VIEW green.v{i} AS SELECT * FROM green.t1;" for i in range(0, self.n()) ) noise_views_on_blue_view = "\n".join( f"> CREATE VIEW noise.v{i} AS SELECT * FROM blue.v0;" for i in range(0, self.n()) ) noise_views_on_noise_view = "\n".join( f"> CREATE VIEW noise.extra_v{i} AS SELECT * FROM noise.v0;" for i in range(0, self.n()) ) return [ TdAction( f""" > CREATE SCHEMA blue; > CREATE SCHEMA green; > CREATE SCHEMA noise; > CREATE TABLE blue.t1 (a int, b text); > CREATE TABLE green.t1 (a int, b text); {blue_views_on_table} {green_views_on_table} {noise_views_on_blue_view} {noise_views_on_noise_view} """ ), ] def benchmark(self) -> MeasurementSource: return Td( dedent( """ > SELECT 1; /* A */ 1 > ALTER SCHEMA blue SWAP WITH green; > SELECT 1; /* B */ 1 """ ) )
Ancestors
Class variables
var FIXED_SCALE : bool
var SCALE : float
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( dedent( """ > SELECT 1; /* A */ 1 > ALTER SCHEMA blue SWAP WITH green; > SELECT 1; /* B */ 1 """ ) )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: blue_views_on_table = "\n".join( f"> CREATE VIEW blue.v{i} AS SELECT * FROM blue.t1;" for i in range(0, self.n()) ) green_views_on_table = "\n".join( f"> CREATE VIEW green.v{i} AS SELECT * FROM green.t1;" for i in range(0, self.n()) ) noise_views_on_blue_view = "\n".join( f"> CREATE VIEW noise.v{i} AS SELECT * FROM blue.v0;" for i in range(0, self.n()) ) noise_views_on_noise_view = "\n".join( f"> CREATE VIEW noise.extra_v{i} AS SELECT * FROM noise.v0;" for i in range(0, self.n()) ) return [ TdAction( f""" > CREATE SCHEMA blue; > CREATE SCHEMA green; > CREATE SCHEMA noise; > CREATE TABLE blue.t1 (a int, b text); > CREATE TABLE green.t1 (a int, b text); {blue_views_on_table} {green_views_on_table} {noise_views_on_blue_view} {noise_views_on_noise_view} """ ), ]
Inherited members
class Update (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes for an UPDATE statement to return to client
Expand source code Browse git
class Update(DML): """Measure the time it takes for an UPDATE statement to return to client""" def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE TABLE t1 (f1 BIGINT); > CREATE DEFAULT INDEX ON t1; > INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()} """ ), ] def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1 /* A */ 1 > UPDATE t1 SET f1 = f1 + {self.n()} /* B */ """ )
Ancestors
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1 /* A */ 1 > UPDATE t1 SET f1 = f1 + {self.n()} /* B */ """ )
def init(self) ‑> list[Action]
-
Expand source code Browse git
def init(self) -> list[Action]: return [ self.table_ten(), TdAction( f""" > CREATE TABLE t1 (f1 BIGINT); > CREATE DEFAULT INDEX ON t1; > INSERT INTO t1 SELECT {self.unique_values()} FROM {self.join()} """ ), ]
Inherited members
class UpdateMultiNoIndex (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes to perform multiple updates over the same records in a non-indexed table. GitHub Issue #11071
Expand source code Browse git
class UpdateMultiNoIndex(DML): """Measure the time it takes to perform multiple updates over the same records in a non-indexed table. GitHub Issue #11071""" def before(self) -> Action: # Due to exterme variability in the results, we have no option but to drop and re-create # the table prior to each measurement return TdAction( f""" > DROP TABLE IF EXISTS t1; > CREATE TABLE t1 (f1 BIGINT); > INSERT INTO t1 SELECT * FROM generate_series(0, {self.n()}) """ ) def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1 /* A */ 1 > UPDATE t1 SET f1 = f1 + {self.n()} > SELECT COUNT(*) FROM t1 WHERE f1 > {self.n()} /* B */ {self.n()} """ )
Ancestors
Methods
def before(self) ‑> Action
-
Expand source code Browse git
def before(self) -> Action: # Due to exterme variability in the results, we have no option but to drop and re-create # the table prior to each measurement return TdAction( f""" > DROP TABLE IF EXISTS t1; > CREATE TABLE t1 (f1 BIGINT); > INSERT INTO t1 SELECT * FROM generate_series(0, {self.n()}) """ )
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: return Td( f""" > SELECT 1 /* A */ 1 > UPDATE t1 SET f1 = f1 + {self.n()} > SELECT COUNT(*) FROM t1 WHERE f1 > {self.n()} /* B */ {self.n()} """ )
Inherited members