Module materialize.data_ingest.definition

Expand source code Browse git
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import random
from collections.abc import Iterator
from enum import Enum

from materialize.data_ingest.data_type import RecordSize
from materialize.data_ingest.field import Field
from materialize.data_ingest.row import Operation, Row
from materialize.data_ingest.rowlist import RowList

rng = random.Random()


class Records(Enum):
    ALL = 0  #  Only applies to DELETE operations
    ONE = 1
    HUNDRED = 100
    SOME = 1_000
    MANY = 1_000_000


class Keyspace(Enum):
    SINGLE_VALUE = 1
    LARGE = 2
    EXISTING = 3


class Target(Enum):
    KAFKA = 1
    POSTGRES = 2
    PRINT = 3


class Definition:
    def generate(self, fields: list[Field]) -> Iterator[RowList]:
        raise NotImplementedError


class Insert(Definition):
    def __init__(self, count: Records, record_size: RecordSize):
        self.count = count.value
        self.record_size = record_size
        self.current_key = 0

    def max_key(self) -> int:
        if self.count < 1:
            raise ValueError(
                f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
            )
        return self.count

    def generate(self, fields: list[Field]) -> Iterator[RowList]:
        if self.count < 1:
            raise ValueError(
                f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
            )

        for i in range(self.count):
            if self.current_key >= self.count:
                break

            values = [
                field.data_type.numeric_value(self.current_key)
                if field.is_key
                else field.data_type.random_value(rng, self.record_size)
                for field in fields
            ]
            self.current_key += 1

            yield RowList(
                [
                    Row(
                        fields=fields,
                        values=values,
                        operation=Operation.INSERT,
                    )
                ]
            )


class Upsert(Definition):
    def __init__(self, keyspace: Keyspace, count: Records, record_size: RecordSize):
        self.keyspace = keyspace
        self.count = count.value
        self.record_size = record_size

    def generate(self, fields: list[Field]) -> Iterator[RowList]:
        if self.count < 1:
            raise ValueError(
                f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
            )

        for i in range(self.count):
            values = [
                field.data_type.numeric_value(0)
                if field.is_key
                else field.data_type.random_value(rng, self.record_size)
                for field in fields
            ]

            yield RowList(
                [
                    Row(
                        fields=fields,
                        values=values,
                        operation=Operation.UPSERT,
                    )
                ]
            )


class Delete(Definition):
    def __init__(
        self,
        number_of_records: Records,
        record_size: RecordSize,
        num: int | None = None,
    ):
        self.number_of_records = number_of_records
        self.record_size = record_size
        self.num = num

    def generate(self, fields: list[Field]) -> Iterator[RowList]:

        if self.number_of_records == Records.ONE:
            values = [
                field.data_type.random_value(rng, self.record_size)
                for field in fields
                if field.is_key
            ]
            yield RowList([Row(fields, values, Operation.DELETE)])
        elif self.number_of_records in (Records.SOME, Records.MANY):
            for i in range(self.number_of_records.value):
                values = [
                    field.data_type.random_value(rng, self.record_size)
                    for field in fields
                    if field.is_key
                ]
                yield RowList([Row(fields, values, Operation.DELETE)])
        elif self.number_of_records == Records.ALL:
            assert self.num is not None
            for i in range(self.num):
                values = [
                    field.data_type.numeric_value(i) for field in fields if field.is_key
                ]
                yield RowList([Row(fields, values, Operation.DELETE)])
        else:
            raise ValueError(f"Unexpected number of records {self.number_of_records}")

Classes

class Definition
Expand source code Browse git
class Definition:
    def generate(self, fields: list[Field]) -> Iterator[RowList]:
        raise NotImplementedError

Subclasses

Methods

def generate(self, fields: list[Field]) ‑> collections.abc.Iterator[RowList]
Expand source code Browse git
def generate(self, fields: list[Field]) -> Iterator[RowList]:
    raise NotImplementedError
class Delete (number_of_records: Records, record_size: RecordSize, num: int | None = None)
Expand source code Browse git
class Delete(Definition):
    def __init__(
        self,
        number_of_records: Records,
        record_size: RecordSize,
        num: int | None = None,
    ):
        self.number_of_records = number_of_records
        self.record_size = record_size
        self.num = num

    def generate(self, fields: list[Field]) -> Iterator[RowList]:

        if self.number_of_records == Records.ONE:
            values = [
                field.data_type.random_value(rng, self.record_size)
                for field in fields
                if field.is_key
            ]
            yield RowList([Row(fields, values, Operation.DELETE)])
        elif self.number_of_records in (Records.SOME, Records.MANY):
            for i in range(self.number_of_records.value):
                values = [
                    field.data_type.random_value(rng, self.record_size)
                    for field in fields
                    if field.is_key
                ]
                yield RowList([Row(fields, values, Operation.DELETE)])
        elif self.number_of_records == Records.ALL:
            assert self.num is not None
            for i in range(self.num):
                values = [
                    field.data_type.numeric_value(i) for field in fields if field.is_key
                ]
                yield RowList([Row(fields, values, Operation.DELETE)])
        else:
            raise ValueError(f"Unexpected number of records {self.number_of_records}")

Ancestors

Methods

def generate(self, fields: list[Field]) ‑> collections.abc.Iterator[RowList]
Expand source code Browse git
def generate(self, fields: list[Field]) -> Iterator[RowList]:

    if self.number_of_records == Records.ONE:
        values = [
            field.data_type.random_value(rng, self.record_size)
            for field in fields
            if field.is_key
        ]
        yield RowList([Row(fields, values, Operation.DELETE)])
    elif self.number_of_records in (Records.SOME, Records.MANY):
        for i in range(self.number_of_records.value):
            values = [
                field.data_type.random_value(rng, self.record_size)
                for field in fields
                if field.is_key
            ]
            yield RowList([Row(fields, values, Operation.DELETE)])
    elif self.number_of_records == Records.ALL:
        assert self.num is not None
        for i in range(self.num):
            values = [
                field.data_type.numeric_value(i) for field in fields if field.is_key
            ]
            yield RowList([Row(fields, values, Operation.DELETE)])
    else:
        raise ValueError(f"Unexpected number of records {self.number_of_records}")
class Insert (count: Records, record_size: RecordSize)
Expand source code Browse git
class Insert(Definition):
    def __init__(self, count: Records, record_size: RecordSize):
        self.count = count.value
        self.record_size = record_size
        self.current_key = 0

    def max_key(self) -> int:
        if self.count < 1:
            raise ValueError(
                f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
            )
        return self.count

    def generate(self, fields: list[Field]) -> Iterator[RowList]:
        if self.count < 1:
            raise ValueError(
                f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
            )

        for i in range(self.count):
            if self.current_key >= self.count:
                break

            values = [
                field.data_type.numeric_value(self.current_key)
                if field.is_key
                else field.data_type.random_value(rng, self.record_size)
                for field in fields
            ]
            self.current_key += 1

            yield RowList(
                [
                    Row(
                        fields=fields,
                        values=values,
                        operation=Operation.INSERT,
                    )
                ]
            )

Ancestors

Methods

def generate(self, fields: list[Field]) ‑> collections.abc.Iterator[RowList]
Expand source code Browse git
def generate(self, fields: list[Field]) -> Iterator[RowList]:
    if self.count < 1:
        raise ValueError(
            f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
        )

    for i in range(self.count):
        if self.current_key >= self.count:
            break

        values = [
            field.data_type.numeric_value(self.current_key)
            if field.is_key
            else field.data_type.random_value(rng, self.record_size)
            for field in fields
        ]
        self.current_key += 1

        yield RowList(
            [
                Row(
                    fields=fields,
                    values=values,
                    operation=Operation.INSERT,
                )
            ]
        )
def max_key(self) ‑> int
Expand source code Browse git
def max_key(self) -> int:
    if self.count < 1:
        raise ValueError(
            f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
        )
    return self.count
class Keyspace (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code Browse git
class Keyspace(Enum):
    SINGLE_VALUE = 1
    LARGE = 2
    EXISTING = 3

Ancestors

  • enum.Enum

Class variables

var EXISTING
var LARGE
var SINGLE_VALUE
class Records (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code Browse git
class Records(Enum):
    ALL = 0  #  Only applies to DELETE operations
    ONE = 1
    HUNDRED = 100
    SOME = 1_000
    MANY = 1_000_000

Ancestors

  • enum.Enum

Class variables

var ALL
var HUNDRED
var MANY
var ONE
var SOME
class Target (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code Browse git
class Target(Enum):
    KAFKA = 1
    POSTGRES = 2
    PRINT = 3

Ancestors

  • enum.Enum

Class variables

var KAFKA
var POSTGRES
var PRINT
class Upsert (keyspace: Keyspace, count: Records, record_size: RecordSize)
Expand source code Browse git
class Upsert(Definition):
    def __init__(self, keyspace: Keyspace, count: Records, record_size: RecordSize):
        self.keyspace = keyspace
        self.count = count.value
        self.record_size = record_size

    def generate(self, fields: list[Field]) -> Iterator[RowList]:
        if self.count < 1:
            raise ValueError(
                f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
            )

        for i in range(self.count):
            values = [
                field.data_type.numeric_value(0)
                if field.is_key
                else field.data_type.random_value(rng, self.record_size)
                for field in fields
            ]

            yield RowList(
                [
                    Row(
                        fields=fields,
                        values=values,
                        operation=Operation.UPSERT,
                    )
                ]
            )

Ancestors

Methods

def generate(self, fields: list[Field]) ‑> collections.abc.Iterator[RowList]
Expand source code Browse git
def generate(self, fields: list[Field]) -> Iterator[RowList]:
    if self.count < 1:
        raise ValueError(
            f'Unexpected count {self.count}, doesn\'t make sense to generate "ALL" values'
        )

    for i in range(self.count):
        values = [
            field.data_type.numeric_value(0)
            if field.is_key
            else field.data_type.random_value(rng, self.record_size)
            for field in fields
        ]

        yield RowList(
            [
                Row(
                    fields=fields,
                    values=values,
                    operation=Operation.UPSERT,
                )
            ]
        )