Module materialize.data_ingest.definition
Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.
import random
from collections.abc import Iterator
from enum import Enum
from materialize.data_ingest.data_type import RecordSize
from materialize.data_ingest.field import Field
from materialize.data_ingest.row import Operation, Row
from materialize.data_ingest.rowlist import RowList
rng = random.Random()
class Records(Enum):
ALL = 0 # Only applies to DELETE operations
ONE = 1
HUNDRED = 100
SOME = 1_000
MANY = 1_000_000
class Keyspace(Enum):
SINGLE_VALUE = 1
LARGE = 2
EXISTING = 3
class Target(Enum):
KAFKA = 1
POSTGRES = 2
PRINT = 3
class Definition:
def generate(self, fields: list[Field]) -> Iterator[RowList]:
raise NotImplementedError
class Insert(Definition):
def __init__(self, count: Records, record_size: RecordSize):
self.count = count.value
self.record_size = record_size
self.current_key = 0
def max_key(self) -> int:
if self.count < 1:
raise ValueError(
f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
)
return self.count
def generate(self, fields: list[Field]) -> Iterator[RowList]:
if self.count < 1:
raise ValueError(
f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
)
for i in range(self.count):
if self.current_key >= self.count:
break
values = [
field.data_type.numeric_value(self.current_key)
if field.is_key
else field.data_type.random_value(rng, self.record_size)
for field in fields
]
self.current_key += 1
yield RowList(
[
Row(
fields=fields,
values=values,
operation=Operation.INSERT,
)
]
)
class Upsert(Definition):
def __init__(self, keyspace: Keyspace, count: Records, record_size: RecordSize):
self.keyspace = keyspace
self.count = count.value
self.record_size = record_size
def generate(self, fields: list[Field]) -> Iterator[RowList]:
if self.count < 1:
raise ValueError(
f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
)
for i in range(self.count):
values = [
field.data_type.numeric_value(0)
if field.is_key
else field.data_type.random_value(rng, self.record_size)
for field in fields
]
yield RowList(
[
Row(
fields=fields,
values=values,
operation=Operation.UPSERT,
)
]
)
class Delete(Definition):
def __init__(
self,
number_of_records: Records,
record_size: RecordSize,
num: int | None = None,
):
self.number_of_records = number_of_records
self.record_size = record_size
self.num = num
def generate(self, fields: list[Field]) -> Iterator[RowList]:
if self.number_of_records == Records.ONE:
values = [
field.data_type.random_value(rng, self.record_size)
for field in fields
if field.is_key
]
yield RowList([Row(fields, values, Operation.DELETE)])
elif self.number_of_records in (Records.SOME, Records.MANY):
for i in range(self.number_of_records.value):
values = [
field.data_type.random_value(rng, self.record_size)
for field in fields
if field.is_key
]
yield RowList([Row(fields, values, Operation.DELETE)])
elif self.number_of_records == Records.ALL:
assert self.num is not None
for i in range(self.num):
values = [
field.data_type.numeric_value(i) for field in fields if field.is_key
]
yield RowList([Row(fields, values, Operation.DELETE)])
else:
raise ValueError(f"Unexpected number of records {self.number_of_records}")
Classes
class Definition
-
Expand source code Browse git
class Definition: def generate(self, fields: list[Field]) -> Iterator[RowList]: raise NotImplementedError
Subclasses
Methods
def generate(self, fields: list[Field]) ‑> collections.abc.Iterator[RowList]
-
Expand source code Browse git
def generate(self, fields: list[Field]) -> Iterator[RowList]: raise NotImplementedError
class Delete (number_of_records: Records, record_size: RecordSize, num: int | None = None)
-
Expand source code Browse git
class Delete(Definition): def __init__( self, number_of_records: Records, record_size: RecordSize, num: int | None = None, ): self.number_of_records = number_of_records self.record_size = record_size self.num = num def generate(self, fields: list[Field]) -> Iterator[RowList]: if self.number_of_records == Records.ONE: values = [ field.data_type.random_value(rng, self.record_size) for field in fields if field.is_key ] yield RowList([Row(fields, values, Operation.DELETE)]) elif self.number_of_records in (Records.SOME, Records.MANY): for i in range(self.number_of_records.value): values = [ field.data_type.random_value(rng, self.record_size) for field in fields if field.is_key ] yield RowList([Row(fields, values, Operation.DELETE)]) elif self.number_of_records == Records.ALL: assert self.num is not None for i in range(self.num): values = [ field.data_type.numeric_value(i) for field in fields if field.is_key ] yield RowList([Row(fields, values, Operation.DELETE)]) else: raise ValueError(f"Unexpected number of records {self.number_of_records}")
Ancestors
Methods
def generate(self, fields: list[Field]) ‑> collections.abc.Iterator[RowList]
-
Expand source code Browse git
def generate(self, fields: list[Field]) -> Iterator[RowList]: if self.number_of_records == Records.ONE: values = [ field.data_type.random_value(rng, self.record_size) for field in fields if field.is_key ] yield RowList([Row(fields, values, Operation.DELETE)]) elif self.number_of_records in (Records.SOME, Records.MANY): for i in range(self.number_of_records.value): values = [ field.data_type.random_value(rng, self.record_size) for field in fields if field.is_key ] yield RowList([Row(fields, values, Operation.DELETE)]) elif self.number_of_records == Records.ALL: assert self.num is not None for i in range(self.num): values = [ field.data_type.numeric_value(i) for field in fields if field.is_key ] yield RowList([Row(fields, values, Operation.DELETE)]) else: raise ValueError(f"Unexpected number of records {self.number_of_records}")
class Insert (count: Records, record_size: RecordSize)
-
Expand source code Browse git
class Insert(Definition): def __init__(self, count: Records, record_size: RecordSize): self.count = count.value self.record_size = record_size self.current_key = 0 def max_key(self) -> int: if self.count < 1: raise ValueError( f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values' ) return self.count def generate(self, fields: list[Field]) -> Iterator[RowList]: if self.count < 1: raise ValueError( f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values' ) for i in range(self.count): if self.current_key >= self.count: break values = [ field.data_type.numeric_value(self.current_key) if field.is_key else field.data_type.random_value(rng, self.record_size) for field in fields ] self.current_key += 1 yield RowList( [ Row( fields=fields, values=values, operation=Operation.INSERT, ) ] )
Ancestors
Methods
def generate(self, fields: list[Field]) ‑> collections.abc.Iterator[RowList]
-
Expand source code Browse git
def generate(self, fields: list[Field]) -> Iterator[RowList]: if self.count < 1: raise ValueError( f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values' ) for i in range(self.count): if self.current_key >= self.count: break values = [ field.data_type.numeric_value(self.current_key) if field.is_key else field.data_type.random_value(rng, self.record_size) for field in fields ] self.current_key += 1 yield RowList( [ Row( fields=fields, values=values, operation=Operation.INSERT, ) ] )
def max_key(self) ‑> int
-
Expand source code Browse git
def max_key(self) -> int: if self.count < 1: raise ValueError( f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values' ) return self.count
class Keyspace (*args, **kwds)
-
Create a collection of name/value pairs.
Example enumeration:
>>> class Color(Enum): ... RED = 1 ... BLUE = 2 ... GREEN = 3
Access them by:
- attribute access::
>>> Color.RED <Color.RED: 1>
- value lookup:
>>> Color(1) <Color.RED: 1>
- name lookup:
>>> Color['RED'] <Color.RED: 1>
Enumerations can be iterated over, and know how many members they have:
>>> len(Color) 3
>>> list(Color) [<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.
Expand source code Browse git
class Keyspace(Enum): SINGLE_VALUE = 1 LARGE = 2 EXISTING = 3
Ancestors
- enum.Enum
Class variables
var EXISTING
var LARGE
var SINGLE_VALUE
class Records (*args, **kwds)
-
Create a collection of name/value pairs.
Example enumeration:
>>> class Color(Enum): ... RED = 1 ... BLUE = 2 ... GREEN = 3
Access them by:
- attribute access::
>>> Color.RED <Color.RED: 1>
- value lookup:
>>> Color(1) <Color.RED: 1>
- name lookup:
>>> Color['RED'] <Color.RED: 1>
Enumerations can be iterated over, and know how many members they have:
>>> len(Color) 3
>>> list(Color) [<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.
Expand source code Browse git
class Records(Enum): ALL = 0 # Only applies to DELETE operations ONE = 1 HUNDRED = 100 SOME = 1_000 MANY = 1_000_000
Ancestors
- enum.Enum
Class variables
var ALL
var HUNDRED
var MANY
var ONE
var SOME
class Target (*args, **kwds)
-
Create a collection of name/value pairs.
Example enumeration:
>>> class Color(Enum): ... RED = 1 ... BLUE = 2 ... GREEN = 3
Access them by:
- attribute access::
>>> Color.RED <Color.RED: 1>
- value lookup:
>>> Color(1) <Color.RED: 1>
- name lookup:
>>> Color['RED'] <Color.RED: 1>
Enumerations can be iterated over, and know how many members they have:
>>> len(Color) 3
>>> list(Color) [<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.
Expand source code Browse git
class Target(Enum): KAFKA = 1 POSTGRES = 2 PRINT = 3
Ancestors
- enum.Enum
Class variables
var KAFKA
var POSTGRES
var PRINT
class Upsert (keyspace: Keyspace, count: Records, record_size: RecordSize)
-
Expand source code Browse git
class Upsert(Definition): def __init__(self, keyspace: Keyspace, count: Records, record_size: RecordSize): self.keyspace = keyspace self.count = count.value self.record_size = record_size def generate(self, fields: list[Field]) -> Iterator[RowList]: if self.count < 1: raise ValueError( f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values' ) for i in range(self.count): values = [ field.data_type.numeric_value(0) if field.is_key else field.data_type.random_value(rng, self.record_size) for field in fields ] yield RowList( [ Row( fields=fields, values=values, operation=Operation.UPSERT, ) ] )
Ancestors
Methods
def generate(self, fields: list[Field]) ‑> collections.abc.Iterator[RowList]
-
Expand source code Browse git
def generate(self, fields: list[Field]) -> Iterator[RowList]: if self.count < 1: raise ValueError( f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values' ) for i in range(self.count): values = [ field.data_type.numeric_value(0) if field.is_key else field.data_type.random_value(rng, self.record_size) for field in fields ] yield RowList( [ Row( fields=fields, values=values, operation=Operation.UPSERT, ) ] )