Module materialize.feature_benchmark.scenarios.concurrency
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from materialize.feature_benchmark.action import Action, TdAction
from materialize.feature_benchmark.measurement_source import MeasurementSource, Td
from materialize.feature_benchmark.scenario import Scenario
class Concurrency(Scenario):
"""Feature benchmarks related to testing concurrency aspects of the system"""
class ParallelIngestion(Concurrency):
"""Measure the time it takes to ingest multiple sources concurrently."""
SOURCES = 10
FIXED_SCALE = True # Disk slowness in CRDB leading to CRDB going down
def shared(self) -> Action:
return TdAction(
self.schema()
+ self.keyschema()
+ f"""
$ kafka-create-topic topic=kafka-parallel-ingestion partitions=4
$ kafka-ingest format=avro topic=kafka-parallel-ingestion key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()}
{{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }}
"""
)
def benchmark(self) -> MeasurementSource:
sources = range(1, ParallelIngestion.SOURCES + 1)
drop_sources = "\n".join(
[
f"""
> DROP SOURCE IF EXISTS s{s}
> DROP CLUSTER IF EXISTS s{s}_cluster
"""
for s in sources
]
)
create_sources = "\n".join(
[
f"""
> CREATE CONNECTION IF NOT EXISTS csr_conn
FOR CONFLUENT SCHEMA REGISTRY
URL '${{testdrive.schema-registry-url}}';
>[version<7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}');
>[version>=7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT);
> CREATE CLUSTER s{s}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1;
> CREATE SOURCE s{s}
IN CLUSTER s{s}_cluster
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-parallel-ingestion-${{testdrive.seed}}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
"""
for s in sources
]
)
create_indexes = "\n".join(
[
f"""
> CREATE DEFAULT INDEX ON s{s}
"""
for s in sources
]
)
selects = "\n".join(
[
f"""
> SELECT * FROM s{s} WHERE f2 = {self.n()-1}
{self.n()-1}
"""
for s in sources
]
)
return Td(
self.schema()
+ f"""
{drop_sources}
{create_sources}
> SELECT 1
/* A */
1
{create_indexes}
{selects}
> SELECT 1
/* B */
1
"""
)
class ParallelDataflows(Concurrency):
"""Measure the time it takes to compute multiple parallel dataflows."""
SCALE = 6
VIEWS = 25
def benchmark(self) -> MeasurementSource:
views = range(1, ParallelDataflows.VIEWS + 1)
create_views = "\n".join(
[
f"""
> CREATE MATERIALIZED VIEW v{v} AS
SELECT COUNT(DISTINCT generate_series) + {v} - {v} AS f1
FROM generate_series(1,{self.n()})
"""
for v in views
]
)
selects = "\n".join(
[
f"""
> SELECT * FROM v{v}
{self.n()}
"""
for v in views
]
)
return Td(
f"""
$ postgres-execute connection=postgres://mz_system@materialized:6877/materialize
DROP SCHEMA public CASCADE;
> CREATE SCHEMA public;
> SELECT 1
/* A */
1
{create_views}
{selects}
> SELECT 1
/* B */
1
"""
)
Classes
class Concurrency (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Feature benchmarks related to testing concurrency aspects of the system
Expand source code Browse git
class Concurrency(Scenario): """Feature benchmarks related to testing concurrency aspects of the system"""
Ancestors
Subclasses
Inherited members
class ParallelDataflows (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes to compute multiple parallel dataflows.
Expand source code Browse git
class ParallelDataflows(Concurrency): """Measure the time it takes to compute multiple parallel dataflows.""" SCALE = 6 VIEWS = 25 def benchmark(self) -> MeasurementSource: views = range(1, ParallelDataflows.VIEWS + 1) create_views = "\n".join( [ f""" > CREATE MATERIALIZED VIEW v{v} AS SELECT COUNT(DISTINCT generate_series) + {v} - {v} AS f1 FROM generate_series(1,{self.n()}) """ for v in views ] ) selects = "\n".join( [ f""" > SELECT * FROM v{v} {self.n()} """ for v in views ] ) return Td( f""" $ postgres-execute connection=postgres://mz_system@materialized:6877/materialize DROP SCHEMA public CASCADE; > CREATE SCHEMA public; > SELECT 1 /* A */ 1 {create_views} {selects} > SELECT 1 /* B */ 1 """ )
Ancestors
Class variables
var SCALE : float
var VIEWS
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: views = range(1, ParallelDataflows.VIEWS + 1) create_views = "\n".join( [ f""" > CREATE MATERIALIZED VIEW v{v} AS SELECT COUNT(DISTINCT generate_series) + {v} - {v} AS f1 FROM generate_series(1,{self.n()}) """ for v in views ] ) selects = "\n".join( [ f""" > SELECT * FROM v{v} {self.n()} """ for v in views ] ) return Td( f""" $ postgres-execute connection=postgres://mz_system@materialized:6877/materialize DROP SCHEMA public CASCADE; > CREATE SCHEMA public; > SELECT 1 /* A */ 1 {create_views} {selects} > SELECT 1 /* B */ 1 """ )
Inherited members
class ParallelIngestion (scale: float, mz_version: MzVersion, default_size: int, seed: int)
-
Measure the time it takes to ingest multiple sources concurrently.
Expand source code Browse git
class ParallelIngestion(Concurrency): """Measure the time it takes to ingest multiple sources concurrently.""" SOURCES = 10 FIXED_SCALE = True # Disk slowness in CRDB leading to CRDB going down def shared(self) -> Action: return TdAction( self.schema() + self.keyschema() + f""" $ kafka-create-topic topic=kafka-parallel-ingestion partitions=4 $ kafka-ingest format=avro topic=kafka-parallel-ingestion key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()} {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }} """ ) def benchmark(self) -> MeasurementSource: sources = range(1, ParallelIngestion.SOURCES + 1) drop_sources = "\n".join( [ f""" > DROP SOURCE IF EXISTS s{s} > DROP CLUSTER IF EXISTS s{s}_cluster """ for s in sources ] ) create_sources = "\n".join( [ f""" > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}'; >[version<7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CLUSTER s{s}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s{s} IN CLUSTER s{s}_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-parallel-ingestion-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn """ for s in sources ] ) create_indexes = "\n".join( [ f""" > CREATE DEFAULT INDEX ON s{s} """ for s in sources ] ) selects = "\n".join( [ f""" > SELECT * FROM s{s} WHERE f2 = {self.n()-1} {self.n()-1} """ for s in sources ] ) return Td( self.schema() + f""" {drop_sources} {create_sources} > SELECT 1 /* A */ 1 {create_indexes} {selects} > SELECT 1 /* B */ 1 """ )
Ancestors
Class variables
var FIXED_SCALE : bool
var SOURCES
Methods
def benchmark(self) ‑> MeasurementSource
-
Expand source code Browse git
def benchmark(self) -> MeasurementSource: sources = range(1, ParallelIngestion.SOURCES + 1) drop_sources = "\n".join( [ f""" > DROP SOURCE IF EXISTS s{s} > DROP CLUSTER IF EXISTS s{s}_cluster """ for s in sources ] ) create_sources = "\n".join( [ f""" > CREATE CONNECTION IF NOT EXISTS csr_conn FOR CONFLUENT SCHEMA REGISTRY URL '${{testdrive.schema-registry-url}}'; >[version<7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}'); >[version>=7800] CREATE CONNECTION IF NOT EXISTS kafka_conn TO KAFKA (BROKER '${{testdrive.kafka-addr}}', SECURITY PROTOCOL PLAINTEXT); > CREATE CLUSTER s{s}_cluster SIZE '{self._default_size}', REPLICATION FACTOR 1; > CREATE SOURCE s{s} IN CLUSTER s{s}_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-kafka-parallel-ingestion-${{testdrive.seed}}') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn """ for s in sources ] ) create_indexes = "\n".join( [ f""" > CREATE DEFAULT INDEX ON s{s} """ for s in sources ] ) selects = "\n".join( [ f""" > SELECT * FROM s{s} WHERE f2 = {self.n()-1} {self.n()-1} """ for s in sources ] ) return Td( self.schema() + f""" {drop_sources} {create_sources} > SELECT 1 /* A */ 1 {create_indexes} {selects} > SELECT 1 /* B */ 1 """ )
-
Expand source code Browse git
def shared(self) -> Action: return TdAction( self.schema() + self.keyschema() + f""" $ kafka-create-topic topic=kafka-parallel-ingestion partitions=4 $ kafka-ingest format=avro topic=kafka-parallel-ingestion key-format=avro key-schema=${{keyschema}} schema=${{schema}} repeat={self.n()} {{"f1": ${{kafka-ingest.iteration}} }} {{"f2": ${{kafka-ingest.iteration}} }} """ )
Inherited members