Module materialize.checks.all_checks.json_source
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
from textwrap import dedent
from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check, externally_idempotent
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion
@externally_idempotent(False)
class JsonSource(Check):
"""Test CREATE SOURCE ... FORMAT JSON"""
def _can_run(self, e: Executor) -> bool:
return self.base_version >= MzVersion.parse_mz("v0.60.0-dev")
def initialize(self) -> Testdrive:
return Testdrive(
dedent(
"""
$ kafka-create-topic topic=format-json partitions=1
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json
"object":{"a":"b","c":"d"}
> CREATE CLUSTER single_replica_cluster SIZE '1';
> CREATE SOURCE format_jsonA
IN CLUSTER single_replica_cluster
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-${testdrive.seed}')
KEY FORMAT JSON
VALUE FORMAT JSON
ENVELOPE UPSERT
> CREATE SOURCE format_jsonB
IN CLUSTER single_replica_cluster
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-${testdrive.seed}')
KEY FORMAT JSON
VALUE FORMAT JSON
ENVELOPE UPSERT
"""
)
)
def manipulate(self) -> list[Testdrive]:
return [
Testdrive(dedent(s))
for s in [
"""
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json
"float":1.23
"str":"hello"
""",
"""
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json
"array":[1,2,3]
"int":1
""",
]
]
def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
> SELECT * FROM format_jsonA ORDER BY key
"\\"array\\"" [1,2,3]
"\\"float\\"" 1.23
"\\"int\\"" 1
"\\"object\\"" "{\\"a\\":\\"b\\",\\"c\\":\\"d\\"}"
"\\"str\\"" "\\"hello\\""
> SELECT * FROM format_jsonB ORDER BY key
"\\"array\\"" [1,2,3]
"\\"float\\"" 1.23
"\\"int\\"" 1
"\\"object\\"" "{\\"a\\":\\"b\\",\\"c\\":\\"d\\"}"
"\\"str\\"" "\\"hello\\""
> SHOW CREATE SOURCE format_jsonB;
materialize.public.format_jsonb "CREATE SOURCE \\"materialize\\".\\"public\\".\\"format_jsonb\\" IN CLUSTER \\"single_replica_cluster\\" FROM KAFKA CONNECTION \\"materialize\\".\\"public\\".\\"kafka_conn\\" (TOPIC = 'testdrive-format-json-${testdrive.seed}') KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE UPSERT EXPOSE PROGRESS AS \\"materialize\\".\\"public\\".\\"format_jsonb_progress\\""
"""
)
)
Classes
class JsonSource (base_version: MzVersion, rng: random.Random | None)
-
Test CREATE SOURCE … FORMAT JSON
Expand source code Browse git
@externally_idempotent(False) class JsonSource(Check): """Test CREATE SOURCE ... FORMAT JSON""" def _can_run(self, e: Executor) -> bool: return self.base_version >= MzVersion.parse_mz("v0.60.0-dev") def initialize(self) -> Testdrive: return Testdrive( dedent( """ $ kafka-create-topic topic=format-json partitions=1 $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json "object":{"a":"b","c":"d"} > CREATE CLUSTER single_replica_cluster SIZE '1'; > CREATE SOURCE format_jsonA IN CLUSTER single_replica_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-${testdrive.seed}') KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE UPSERT > CREATE SOURCE format_jsonB IN CLUSTER single_replica_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-${testdrive.seed}') KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE UPSERT """ ) ) def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(s)) for s in [ """ $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json "float":1.23 "str":"hello" """, """ $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json "array":[1,2,3] "int":1 """, ] ] def validate(self) -> Testdrive: return Testdrive( dedent( """ > SELECT * FROM format_jsonA ORDER BY key "\\"array\\"" [1,2,3] "\\"float\\"" 1.23 "\\"int\\"" 1 "\\"object\\"" "{\\"a\\":\\"b\\",\\"c\\":\\"d\\"}" "\\"str\\"" "\\"hello\\"" > SELECT * FROM format_jsonB ORDER BY key "\\"array\\"" [1,2,3] "\\"float\\"" 1.23 "\\"int\\"" 1 "\\"object\\"" "{\\"a\\":\\"b\\",\\"c\\":\\"d\\"}" "\\"str\\"" "\\"hello\\"" > SHOW CREATE SOURCE format_jsonB; materialize.public.format_jsonb "CREATE SOURCE \\"materialize\\".\\"public\\".\\"format_jsonb\\" IN CLUSTER \\"single_replica_cluster\\" FROM KAFKA CONNECTION \\"materialize\\".\\"public\\".\\"kafka_conn\\" (TOPIC = 'testdrive-format-json-${testdrive.seed}') KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE UPSERT EXPOSE PROGRESS AS \\"materialize\\".\\"public\\".\\"format_jsonb_progress\\"" """ ) )
Ancestors
Class variables
var externally_idempotent : bool
Methods
def initialize(self) ‑> Testdrive
-
Expand source code Browse git
def initialize(self) -> Testdrive: return Testdrive( dedent( """ $ kafka-create-topic topic=format-json partitions=1 $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json "object":{"a":"b","c":"d"} > CREATE CLUSTER single_replica_cluster SIZE '1'; > CREATE SOURCE format_jsonA IN CLUSTER single_replica_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-${testdrive.seed}') KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE UPSERT > CREATE SOURCE format_jsonB IN CLUSTER single_replica_cluster FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-format-json-${testdrive.seed}') KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE UPSERT """ ) )
def manipulate(self) ‑> list[Testdrive]
-
Expand source code Browse git
def manipulate(self) -> list[Testdrive]: return [ Testdrive(dedent(s)) for s in [ """ $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json "float":1.23 "str":"hello" """, """ $ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=format-json "array":[1,2,3] "int":1 """, ] ]
def validate(self) ‑> Testdrive
-
Expand source code Browse git
def validate(self) -> Testdrive: return Testdrive( dedent( """ > SELECT * FROM format_jsonA ORDER BY key "\\"array\\"" [1,2,3] "\\"float\\"" 1.23 "\\"int\\"" 1 "\\"object\\"" "{\\"a\\":\\"b\\",\\"c\\":\\"d\\"}" "\\"str\\"" "\\"hello\\"" > SELECT * FROM format_jsonB ORDER BY key "\\"array\\"" [1,2,3] "\\"float\\"" 1.23 "\\"int\\"" 1 "\\"object\\"" "{\\"a\\":\\"b\\",\\"c\\":\\"d\\"}" "\\"str\\"" "\\"hello\\"" > SHOW CREATE SOURCE format_jsonB; materialize.public.format_jsonb "CREATE SOURCE \\"materialize\\".\\"public\\".\\"format_jsonb\\" IN CLUSTER \\"single_replica_cluster\\" FROM KAFKA CONNECTION \\"materialize\\".\\"public\\".\\"kafka_conn\\" (TOPIC = 'testdrive-format-json-${testdrive.seed}') KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE UPSERT EXPOSE PROGRESS AS \\"materialize\\".\\"public\\".\\"format_jsonb_progress\\"" """ ) )