misc.python.materialize.workload_replay.column

Column class for data generation in workload replay.

  1# Copyright Materialize, Inc. and contributors. All rights reserved.
  2#
  3# Use of this software is governed by the Business Source License
  4# included in the LICENSE file at the root of this repository.
  5#
  6# As of the Change Date specified in that file, in accordance with
  7# the Business Source License, use of this software will be governed
  8# by the Apache License, Version 2.0.
  9
 10"""
 11Column class for data generation in workload replay.
 12"""
 13
 14from __future__ import annotations
 15
 16import json
 17import random
 18import string
 19import uuid
 20from typing import Any
 21
 22from pg8000.native import literal
 23
 24from materialize.workload_replay.util import (
 25    long_tail_float,
 26    long_tail_int,
 27    long_tail_rank,
 28    long_tail_text,
 29)
 30
 31
 32class Column:
 33    """Represents a column with type information and data generation capabilities."""
 34
 35    def __init__(
 36        self, name: str, typ: str, nullable: bool, default: Any, data_shape: str | None
 37    ):
 38        self.name = name
 39        self.typ = typ
 40        self.nullable = nullable
 41        self.default = default
 42        self.chars = string.ascii_letters + string.digits
 43        self.data_shape = data_shape
 44
 45        self._years = list(range(2019, 2026))
 46        self._seq_counter = 0
 47
 48        self._hot_strings = [
 49            f"{name}_a",
 50            f"{name}_b",
 51            f"{name}_c",
 52            "foo",
 53            "bar",
 54            "baz",
 55            "0",
 56            "1",
 57            "NULL",
 58        ]
 59
 60    def _shaped_text(self, rng: random.Random) -> str | None:
 61        """Generate text according to data_shape, or None if not applicable."""
 62        if self.data_shape == "datetime":
 63            return self._random_datetime(rng)
 64        elif self.data_shape == "random":
 65            length = rng.randrange(5, 40)
 66            return "".join(rng.choice(self.chars) for _ in range(length))
 67        elif self.data_shape == "uuid":
 68            return str(uuid.UUID(int=rng.getrandbits(128), version=4))
 69        elif self.data_shape == "sequential":
 70            self._seq_counter += 1
 71            return f"{self.name}_{self._seq_counter}"
 72        elif self.data_shape == "zipfian":
 73            rank = long_tail_rank(n=10000, a=1.3, rng=rng)
 74            return f"{self.name}_{rank}"
 75        elif self.data_shape is not None and self.data_shape != "duration":
 76            raise ValueError(f"Unhandled data_shape {self.data_shape!r}")
 77        return None
 78
 79    def _shaped_float(self, rng: random.Random) -> float | None:
 80        """Generate a float according to data_shape, or None if not applicable."""
 81        if self.data_shape == "duration":
 82            return round(rng.uniform(10.0, 1800.0), 2)
 83        return None
 84
 85    def _random_date(self, rng: random.Random) -> str:
 86        """Generate a uniformly random date string."""
 87        year = rng.choice(self._years)
 88        return f"{year}-{rng.randrange(1, 13):02}-{rng.randrange(1, 29):02}"
 89
 90    def _random_datetime(self, rng: random.Random) -> str:
 91        """Generate a uniformly random datetime string."""
 92        return (
 93            f"{self._random_date(rng)}"
 94            f"T{rng.randrange(0, 24):02}:{rng.randrange(0, 60):02}:{rng.randrange(0, 60):02}Z"
 95        )
 96
 97    def avro_type(self) -> str | list[str]:
 98        """Return the Avro type for this column."""
 99        result = self.typ
100        if self.typ in ("text", "bytea", "character", "character varying"):
101            result = "string"
102        elif self.typ in ("smallint", "integer", "uint2", "uint4"):
103            result = "int"
104        elif self.typ in ("bigint", "uint8"):
105            result = "long"
106        elif self.typ in ("double precision", "numeric"):
107            result = "double"
108        elif self.typ in ("timestamp with time zone", "timestamp without time zone"):
109            result = "long"
110        return ["null", result] if self.nullable else result
111
112    def kafka_value(self, rng: random.Random) -> Any:
113        """Generate a value suitable for Kafka serialization."""
114        if self.default and rng.randrange(10) == 0 and self.default != "NULL":
115            return str(self.default)
116        if self.nullable and rng.randrange(10) == 0:
117            return None
118
119        if self.typ == "boolean":
120            return rng.random() < 0.2
121
122        elif self.typ == "smallint":
123            return long_tail_int(-32768, 32767, rng=rng)
124        elif self.typ == "integer":
125            return long_tail_int(-2147483648, 2147483647, rng=rng)
126        elif self.typ == "bigint":
127            return long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
128
129        elif self.typ == "uint2":
130            return long_tail_int(0, 65535, rng=rng)
131        elif self.typ == "uint4":
132            return long_tail_int(0, 4294967295, rng=rng)
133        elif self.typ == "uint8":
134            return long_tail_int(0, 18446744073709551615, rng=rng)
135
136        elif self.typ in ("float", "double precision", "numeric"):
137            shaped = self._shaped_float(rng)
138            if shaped is not None:
139                return shaped
140            return long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng)
141
142        elif self.typ in ("text", "bytea"):
143            shaped = self._shaped_text(rng)
144            if shaped is not None:
145                return literal(shaped)
146            return literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng))
147
148        elif self.typ in ("character", "character varying"):
149            shaped = self._shaped_text(rng)
150            if shaped is not None:
151                return literal(shaped)
152            return literal(long_tail_text(self.chars, 10, self._hot_strings, rng=rng))
153
154        elif self.typ == "uuid":
155            return str(uuid.UUID(int=rng.getrandbits(128), version=4))
156
157        elif self.typ == "jsonb":
158            result = {
159                f"key{key}": str(long_tail_int(-100, 100, rng=rng)) for key in range(20)
160            }
161            return json.dumps(result)
162
163        elif self.typ in ("timestamp with time zone", "timestamp without time zone"):
164            # Epoch millis spread uniformly across 2019–2025
165            # 2019-01-01 = 1546300800000, 2026-01-01 = 1767225600000
166            return rng.randrange(1546300800000, 1767225600000)
167
168        elif self.typ == "mz_timestamp":
169            return literal(self._random_date(rng))
170
171        elif self.typ == "date":
172            return literal(self._random_date(rng))
173
174        elif self.typ == "time":
175            if rng.random() < 0.8:
176                common = ["00:00:00.000000", "12:00:00.000000", "23:59:59.000000"]
177                return literal(rng.choice(common))
178            return literal(
179                f"{rng.randrange(0, 24)}:{rng.randrange(0, 60)}:{rng.randrange(0, 60)}.{rng.randrange(0, 1000000)}"
180            )
181
182        elif self.typ == "int2range":
183            a = long_tail_int(-32768, 32767, rng=rng)
184            b = long_tail_int(-32768, 32767, rng=rng)
185            lo, hi = min(a, b), max(a, b)
186            return literal(f"[{lo},{hi})")
187
188        elif self.typ == "int4range":
189            a = long_tail_int(-2147483648, 2147483647, rng=rng)
190            b = long_tail_int(-2147483648, 2147483647, rng=rng)
191            lo, hi = min(a, b), max(a, b)
192            return literal(f"[{lo},{hi})")
193
194        elif self.typ == "int8range":
195            a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
196            b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
197            lo, hi = min(a, b), max(a, b)
198            return literal(f"[{lo},{hi})")
199
200        elif self.typ == "map":
201            return {
202                str(i): str(long_tail_int(-100, 100, rng=rng)) for i in range(0, 20)
203            }
204
205        elif self.typ == "text[]":
206            values = [
207                literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng))
208                for _ in range(5)
209            ]
210            return literal(f"{{{', '.join(values)}}}")
211
212        else:
213            raise ValueError(f"Unhandled data type {self.typ}")
214
215    def value(self, rng: random.Random, in_query: bool = True) -> Any:
216        """Generate a value suitable for SQL queries or COPY operations."""
217        if self.default and rng.randrange(10) == 0 and self.default != "NULL":
218            return str(self.default) if in_query else self.default
219
220        if self.nullable and rng.randrange(10) == 0:
221            return "NULL" if in_query else None
222
223        if self.typ == "boolean":
224            val = rng.random() < 0.2
225            return ("true" if val else "false") if in_query else val
226
227        elif self.typ == "smallint":
228            val = long_tail_int(-32768, 32767, rng=rng)
229            return str(val) if in_query else val
230
231        elif self.typ == "integer":
232            val = long_tail_int(-2147483648, 2147483647, rng=rng)
233            return str(val) if in_query else val
234
235        elif self.typ == "bigint":
236            val = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
237            return str(val) if in_query else val
238
239        elif self.typ == "uint2":
240            val = long_tail_int(0, 65535, rng=rng)
241            return str(val) if in_query else val
242
243        elif self.typ == "uint4":
244            val = long_tail_int(0, 4294967295, rng=rng)
245            return str(val) if in_query else val
246
247        elif self.typ == "uint8":
248            val = long_tail_int(0, 18446744073709551615, rng=rng)
249            return str(val) if in_query else val
250
251        elif self.typ in ("float", "double precision", "numeric"):
252            shaped = self._shaped_float(rng)
253            if shaped is not None:
254                return str(shaped) if in_query else shaped
255            val = long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng)
256            return str(val) if in_query else val
257
258        elif self.typ in ("text", "bytea"):
259            shaped = self._shaped_text(rng)
260            if shaped is not None:
261                return literal(shaped) if in_query else shaped
262            s = long_tail_text(self.chars, 100, self._hot_strings, rng=rng)
263            return literal(s) if in_query else s
264
265        elif self.typ in ("character", "character varying"):
266            shaped = self._shaped_text(rng)
267            if shaped is not None:
268                return literal(shaped) if in_query else shaped
269            s = long_tail_text(self.chars, 10, self._hot_strings, rng=rng)
270            return literal(s) if in_query else s
271
272        elif self.typ == "uuid":
273            u = uuid.UUID(int=rng.getrandbits(128), version=4)
274            return str(u) if in_query else u
275
276        elif self.typ == "jsonb":
277            obj = {
278                f"key{key}": str(long_tail_int(-100, 100, rng=rng)) for key in range(20)
279            }
280            if in_query:
281                return f"'{json.dumps(obj)}'::jsonb"
282            else:
283                return json.dumps(obj)
284
285        elif self.typ in ("timestamp with time zone", "timestamp without time zone"):
286            s = self._random_date(rng)
287            return literal(s) if in_query else s
288
289        elif self.typ == "mz_timestamp":
290            s = self._random_date(rng)
291            return literal(s) if in_query else s
292
293        elif self.typ == "date":
294            s = self._random_date(rng)
295            return literal(s) if in_query else s
296
297        elif self.typ == "time":
298            if rng.random() < 0.8:
299                s = rng.choice(
300                    ["00:00:00.000000", "12:00:00.000000", "23:59:59.000000"]
301                )
302                return literal(s) if in_query else s
303
304            s = (
305                f"{rng.randrange(0, 24)}:{rng.randrange(0, 60)}:{rng.randrange(0, 60)}"
306                f".{rng.randrange(0, 1000000)}"
307            )
308            return literal(s) if in_query else s
309
310        elif self.typ == "int2range":
311            a = long_tail_int(-32768, 32767, rng=rng)
312            b = long_tail_int(-32768, 32767, rng=rng)
313            lo, hi = min(a, b), max(a, b)
314            s = f"[{lo},{hi})"
315            return literal(s) if in_query else s
316
317        elif self.typ == "int4range":
318            a = long_tail_int(-2147483648, 2147483647, rng=rng)
319            b = long_tail_int(-2147483648, 2147483647, rng=rng)
320            lo, hi = min(a, b), max(a, b)
321            s = f"[{lo},{hi})"
322            return literal(s) if in_query else s
323
324        elif self.typ == "int8range":
325            a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
326            b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
327            lo, hi = min(a, b), max(a, b)
328            s = f"[{lo},{hi})"
329            return literal(s) if in_query else s
330
331        elif self.typ == "map":
332            if in_query:
333                values = [
334                    f"'{i}' => {str(long_tail_int(-100, 100, rng=rng))}"
335                    for i in range(0, 20)
336                ]
337                return literal(f"{{{', '.join(values)}}}")
338            else:
339                # COPY text input for map expects the literal form too
340                values = [
341                    f'"{i}"=>"{str(long_tail_int(-100, 100, rng=rng))}"'
342                    for i in range(0, 20)
343                ]
344                return "{" + ",".join(values) + "}"
345
346        elif self.typ == "text[]":
347            if in_query:
348                values = [
349                    literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng))
350                    for _ in range(5)
351                ]
352                return literal(f"{{{', '.join(values)}}}")
353            else:
354                return [
355                    long_tail_text(self.chars, 100, self._hot_strings, rng=rng)
356                    for _ in range(5)
357                ]
358
359        else:
360            # Custom data type, or not supported yet
361            return "NULL" if in_query else None
class Column:
 33class Column:
 34    """Represents a column with type information and data generation capabilities."""
 35
 36    def __init__(
 37        self, name: str, typ: str, nullable: bool, default: Any, data_shape: str | None
 38    ):
 39        self.name = name
 40        self.typ = typ
 41        self.nullable = nullable
 42        self.default = default
 43        self.chars = string.ascii_letters + string.digits
 44        self.data_shape = data_shape
 45
 46        self._years = list(range(2019, 2026))
 47        self._seq_counter = 0
 48
 49        self._hot_strings = [
 50            f"{name}_a",
 51            f"{name}_b",
 52            f"{name}_c",
 53            "foo",
 54            "bar",
 55            "baz",
 56            "0",
 57            "1",
 58            "NULL",
 59        ]
 60
 61    def _shaped_text(self, rng: random.Random) -> str | None:
 62        """Generate text according to data_shape, or None if not applicable."""
 63        if self.data_shape == "datetime":
 64            return self._random_datetime(rng)
 65        elif self.data_shape == "random":
 66            length = rng.randrange(5, 40)
 67            return "".join(rng.choice(self.chars) for _ in range(length))
 68        elif self.data_shape == "uuid":
 69            return str(uuid.UUID(int=rng.getrandbits(128), version=4))
 70        elif self.data_shape == "sequential":
 71            self._seq_counter += 1
 72            return f"{self.name}_{self._seq_counter}"
 73        elif self.data_shape == "zipfian":
 74            rank = long_tail_rank(n=10000, a=1.3, rng=rng)
 75            return f"{self.name}_{rank}"
 76        elif self.data_shape is not None and self.data_shape != "duration":
 77            raise ValueError(f"Unhandled data_shape {self.data_shape!r}")
 78        return None
 79
 80    def _shaped_float(self, rng: random.Random) -> float | None:
 81        """Generate a float according to data_shape, or None if not applicable."""
 82        if self.data_shape == "duration":
 83            return round(rng.uniform(10.0, 1800.0), 2)
 84        return None
 85
 86    def _random_date(self, rng: random.Random) -> str:
 87        """Generate a uniformly random date string."""
 88        year = rng.choice(self._years)
 89        return f"{year}-{rng.randrange(1, 13):02}-{rng.randrange(1, 29):02}"
 90
 91    def _random_datetime(self, rng: random.Random) -> str:
 92        """Generate a uniformly random datetime string."""
 93        return (
 94            f"{self._random_date(rng)}"
 95            f"T{rng.randrange(0, 24):02}:{rng.randrange(0, 60):02}:{rng.randrange(0, 60):02}Z"
 96        )
 97
 98    def avro_type(self) -> str | list[str]:
 99        """Return the Avro type for this column."""
100        result = self.typ
101        if self.typ in ("text", "bytea", "character", "character varying"):
102            result = "string"
103        elif self.typ in ("smallint", "integer", "uint2", "uint4"):
104            result = "int"
105        elif self.typ in ("bigint", "uint8"):
106            result = "long"
107        elif self.typ in ("double precision", "numeric"):
108            result = "double"
109        elif self.typ in ("timestamp with time zone", "timestamp without time zone"):
110            result = "long"
111        return ["null", result] if self.nullable else result
112
113    def kafka_value(self, rng: random.Random) -> Any:
114        """Generate a value suitable for Kafka serialization."""
115        if self.default and rng.randrange(10) == 0 and self.default != "NULL":
116            return str(self.default)
117        if self.nullable and rng.randrange(10) == 0:
118            return None
119
120        if self.typ == "boolean":
121            return rng.random() < 0.2
122
123        elif self.typ == "smallint":
124            return long_tail_int(-32768, 32767, rng=rng)
125        elif self.typ == "integer":
126            return long_tail_int(-2147483648, 2147483647, rng=rng)
127        elif self.typ == "bigint":
128            return long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
129
130        elif self.typ == "uint2":
131            return long_tail_int(0, 65535, rng=rng)
132        elif self.typ == "uint4":
133            return long_tail_int(0, 4294967295, rng=rng)
134        elif self.typ == "uint8":
135            return long_tail_int(0, 18446744073709551615, rng=rng)
136
137        elif self.typ in ("float", "double precision", "numeric"):
138            shaped = self._shaped_float(rng)
139            if shaped is not None:
140                return shaped
141            return long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng)
142
143        elif self.typ in ("text", "bytea"):
144            shaped = self._shaped_text(rng)
145            if shaped is not None:
146                return literal(shaped)
147            return literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng))
148
149        elif self.typ in ("character", "character varying"):
150            shaped = self._shaped_text(rng)
151            if shaped is not None:
152                return literal(shaped)
153            return literal(long_tail_text(self.chars, 10, self._hot_strings, rng=rng))
154
155        elif self.typ == "uuid":
156            return str(uuid.UUID(int=rng.getrandbits(128), version=4))
157
158        elif self.typ == "jsonb":
159            result = {
160                f"key{key}": str(long_tail_int(-100, 100, rng=rng)) for key in range(20)
161            }
162            return json.dumps(result)
163
164        elif self.typ in ("timestamp with time zone", "timestamp without time zone"):
165            # Epoch millis spread uniformly across 2019–2025
166            # 2019-01-01 = 1546300800000, 2026-01-01 = 1767225600000
167            return rng.randrange(1546300800000, 1767225600000)
168
169        elif self.typ == "mz_timestamp":
170            return literal(self._random_date(rng))
171
172        elif self.typ == "date":
173            return literal(self._random_date(rng))
174
175        elif self.typ == "time":
176            if rng.random() < 0.8:
177                common = ["00:00:00.000000", "12:00:00.000000", "23:59:59.000000"]
178                return literal(rng.choice(common))
179            return literal(
180                f"{rng.randrange(0, 24)}:{rng.randrange(0, 60)}:{rng.randrange(0, 60)}.{rng.randrange(0, 1000000)}"
181            )
182
183        elif self.typ == "int2range":
184            a = long_tail_int(-32768, 32767, rng=rng)
185            b = long_tail_int(-32768, 32767, rng=rng)
186            lo, hi = min(a, b), max(a, b)
187            return literal(f"[{lo},{hi})")
188
189        elif self.typ == "int4range":
190            a = long_tail_int(-2147483648, 2147483647, rng=rng)
191            b = long_tail_int(-2147483648, 2147483647, rng=rng)
192            lo, hi = min(a, b), max(a, b)
193            return literal(f"[{lo},{hi})")
194
195        elif self.typ == "int8range":
196            a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
197            b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
198            lo, hi = min(a, b), max(a, b)
199            return literal(f"[{lo},{hi})")
200
201        elif self.typ == "map":
202            return {
203                str(i): str(long_tail_int(-100, 100, rng=rng)) for i in range(0, 20)
204            }
205
206        elif self.typ == "text[]":
207            values = [
208                literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng))
209                for _ in range(5)
210            ]
211            return literal(f"{{{', '.join(values)}}}")
212
213        else:
214            raise ValueError(f"Unhandled data type {self.typ}")
215
216    def value(self, rng: random.Random, in_query: bool = True) -> Any:
217        """Generate a value suitable for SQL queries or COPY operations."""
218        if self.default and rng.randrange(10) == 0 and self.default != "NULL":
219            return str(self.default) if in_query else self.default
220
221        if self.nullable and rng.randrange(10) == 0:
222            return "NULL" if in_query else None
223
224        if self.typ == "boolean":
225            val = rng.random() < 0.2
226            return ("true" if val else "false") if in_query else val
227
228        elif self.typ == "smallint":
229            val = long_tail_int(-32768, 32767, rng=rng)
230            return str(val) if in_query else val
231
232        elif self.typ == "integer":
233            val = long_tail_int(-2147483648, 2147483647, rng=rng)
234            return str(val) if in_query else val
235
236        elif self.typ == "bigint":
237            val = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
238            return str(val) if in_query else val
239
240        elif self.typ == "uint2":
241            val = long_tail_int(0, 65535, rng=rng)
242            return str(val) if in_query else val
243
244        elif self.typ == "uint4":
245            val = long_tail_int(0, 4294967295, rng=rng)
246            return str(val) if in_query else val
247
248        elif self.typ == "uint8":
249            val = long_tail_int(0, 18446744073709551615, rng=rng)
250            return str(val) if in_query else val
251
252        elif self.typ in ("float", "double precision", "numeric"):
253            shaped = self._shaped_float(rng)
254            if shaped is not None:
255                return str(shaped) if in_query else shaped
256            val = long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng)
257            return str(val) if in_query else val
258
259        elif self.typ in ("text", "bytea"):
260            shaped = self._shaped_text(rng)
261            if shaped is not None:
262                return literal(shaped) if in_query else shaped
263            s = long_tail_text(self.chars, 100, self._hot_strings, rng=rng)
264            return literal(s) if in_query else s
265
266        elif self.typ in ("character", "character varying"):
267            shaped = self._shaped_text(rng)
268            if shaped is not None:
269                return literal(shaped) if in_query else shaped
270            s = long_tail_text(self.chars, 10, self._hot_strings, rng=rng)
271            return literal(s) if in_query else s
272
273        elif self.typ == "uuid":
274            u = uuid.UUID(int=rng.getrandbits(128), version=4)
275            return str(u) if in_query else u
276
277        elif self.typ == "jsonb":
278            obj = {
279                f"key{key}": str(long_tail_int(-100, 100, rng=rng)) for key in range(20)
280            }
281            if in_query:
282                return f"'{json.dumps(obj)}'::jsonb"
283            else:
284                return json.dumps(obj)
285
286        elif self.typ in ("timestamp with time zone", "timestamp without time zone"):
287            s = self._random_date(rng)
288            return literal(s) if in_query else s
289
290        elif self.typ == "mz_timestamp":
291            s = self._random_date(rng)
292            return literal(s) if in_query else s
293
294        elif self.typ == "date":
295            s = self._random_date(rng)
296            return literal(s) if in_query else s
297
298        elif self.typ == "time":
299            if rng.random() < 0.8:
300                s = rng.choice(
301                    ["00:00:00.000000", "12:00:00.000000", "23:59:59.000000"]
302                )
303                return literal(s) if in_query else s
304
305            s = (
306                f"{rng.randrange(0, 24)}:{rng.randrange(0, 60)}:{rng.randrange(0, 60)}"
307                f".{rng.randrange(0, 1000000)}"
308            )
309            return literal(s) if in_query else s
310
311        elif self.typ == "int2range":
312            a = long_tail_int(-32768, 32767, rng=rng)
313            b = long_tail_int(-32768, 32767, rng=rng)
314            lo, hi = min(a, b), max(a, b)
315            s = f"[{lo},{hi})"
316            return literal(s) if in_query else s
317
318        elif self.typ == "int4range":
319            a = long_tail_int(-2147483648, 2147483647, rng=rng)
320            b = long_tail_int(-2147483648, 2147483647, rng=rng)
321            lo, hi = min(a, b), max(a, b)
322            s = f"[{lo},{hi})"
323            return literal(s) if in_query else s
324
325        elif self.typ == "int8range":
326            a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
327            b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
328            lo, hi = min(a, b), max(a, b)
329            s = f"[{lo},{hi})"
330            return literal(s) if in_query else s
331
332        elif self.typ == "map":
333            if in_query:
334                values = [
335                    f"'{i}' => {str(long_tail_int(-100, 100, rng=rng))}"
336                    for i in range(0, 20)
337                ]
338                return literal(f"{{{', '.join(values)}}}")
339            else:
340                # COPY text input for map expects the literal form too
341                values = [
342                    f'"{i}"=>"{str(long_tail_int(-100, 100, rng=rng))}"'
343                    for i in range(0, 20)
344                ]
345                return "{" + ",".join(values) + "}"
346
347        elif self.typ == "text[]":
348            if in_query:
349                values = [
350                    literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng))
351                    for _ in range(5)
352                ]
353                return literal(f"{{{', '.join(values)}}}")
354            else:
355                return [
356                    long_tail_text(self.chars, 100, self._hot_strings, rng=rng)
357                    for _ in range(5)
358                ]
359
360        else:
361            # Custom data type, or not supported yet
362            return "NULL" if in_query else None

Represents a column with type information and data generation capabilities.

Column( name: str, typ: str, nullable: bool, default: Any, data_shape: str | None)
36    def __init__(
37        self, name: str, typ: str, nullable: bool, default: Any, data_shape: str | None
38    ):
39        self.name = name
40        self.typ = typ
41        self.nullable = nullable
42        self.default = default
43        self.chars = string.ascii_letters + string.digits
44        self.data_shape = data_shape
45
46        self._years = list(range(2019, 2026))
47        self._seq_counter = 0
48
49        self._hot_strings = [
50            f"{name}_a",
51            f"{name}_b",
52            f"{name}_c",
53            "foo",
54            "bar",
55            "baz",
56            "0",
57            "1",
58            "NULL",
59        ]
name
typ
nullable
default
chars
data_shape
def avro_type(self) -> str | list[str]:
 98    def avro_type(self) -> str | list[str]:
 99        """Return the Avro type for this column."""
100        result = self.typ
101        if self.typ in ("text", "bytea", "character", "character varying"):
102            result = "string"
103        elif self.typ in ("smallint", "integer", "uint2", "uint4"):
104            result = "int"
105        elif self.typ in ("bigint", "uint8"):
106            result = "long"
107        elif self.typ in ("double precision", "numeric"):
108            result = "double"
109        elif self.typ in ("timestamp with time zone", "timestamp without time zone"):
110            result = "long"
111        return ["null", result] if self.nullable else result

Return the Avro type for this column.

def kafka_value(self, rng: random.Random) -> Any:
113    def kafka_value(self, rng: random.Random) -> Any:
114        """Generate a value suitable for Kafka serialization."""
115        if self.default and rng.randrange(10) == 0 and self.default != "NULL":
116            return str(self.default)
117        if self.nullable and rng.randrange(10) == 0:
118            return None
119
120        if self.typ == "boolean":
121            return rng.random() < 0.2
122
123        elif self.typ == "smallint":
124            return long_tail_int(-32768, 32767, rng=rng)
125        elif self.typ == "integer":
126            return long_tail_int(-2147483648, 2147483647, rng=rng)
127        elif self.typ == "bigint":
128            return long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
129
130        elif self.typ == "uint2":
131            return long_tail_int(0, 65535, rng=rng)
132        elif self.typ == "uint4":
133            return long_tail_int(0, 4294967295, rng=rng)
134        elif self.typ == "uint8":
135            return long_tail_int(0, 18446744073709551615, rng=rng)
136
137        elif self.typ in ("float", "double precision", "numeric"):
138            shaped = self._shaped_float(rng)
139            if shaped is not None:
140                return shaped
141            return long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng)
142
143        elif self.typ in ("text", "bytea"):
144            shaped = self._shaped_text(rng)
145            if shaped is not None:
146                return literal(shaped)
147            return literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng))
148
149        elif self.typ in ("character", "character varying"):
150            shaped = self._shaped_text(rng)
151            if shaped is not None:
152                return literal(shaped)
153            return literal(long_tail_text(self.chars, 10, self._hot_strings, rng=rng))
154
155        elif self.typ == "uuid":
156            return str(uuid.UUID(int=rng.getrandbits(128), version=4))
157
158        elif self.typ == "jsonb":
159            result = {
160                f"key{key}": str(long_tail_int(-100, 100, rng=rng)) for key in range(20)
161            }
162            return json.dumps(result)
163
164        elif self.typ in ("timestamp with time zone", "timestamp without time zone"):
165            # Epoch millis spread uniformly across 2019–2025
166            # 2019-01-01 = 1546300800000, 2026-01-01 = 1767225600000
167            return rng.randrange(1546300800000, 1767225600000)
168
169        elif self.typ == "mz_timestamp":
170            return literal(self._random_date(rng))
171
172        elif self.typ == "date":
173            return literal(self._random_date(rng))
174
175        elif self.typ == "time":
176            if rng.random() < 0.8:
177                common = ["00:00:00.000000", "12:00:00.000000", "23:59:59.000000"]
178                return literal(rng.choice(common))
179            return literal(
180                f"{rng.randrange(0, 24)}:{rng.randrange(0, 60)}:{rng.randrange(0, 60)}.{rng.randrange(0, 1000000)}"
181            )
182
183        elif self.typ == "int2range":
184            a = long_tail_int(-32768, 32767, rng=rng)
185            b = long_tail_int(-32768, 32767, rng=rng)
186            lo, hi = min(a, b), max(a, b)
187            return literal(f"[{lo},{hi})")
188
189        elif self.typ == "int4range":
190            a = long_tail_int(-2147483648, 2147483647, rng=rng)
191            b = long_tail_int(-2147483648, 2147483647, rng=rng)
192            lo, hi = min(a, b), max(a, b)
193            return literal(f"[{lo},{hi})")
194
195        elif self.typ == "int8range":
196            a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
197            b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
198            lo, hi = min(a, b), max(a, b)
199            return literal(f"[{lo},{hi})")
200
201        elif self.typ == "map":
202            return {
203                str(i): str(long_tail_int(-100, 100, rng=rng)) for i in range(0, 20)
204            }
205
206        elif self.typ == "text[]":
207            values = [
208                literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng))
209                for _ in range(5)
210            ]
211            return literal(f"{{{', '.join(values)}}}")
212
213        else:
214            raise ValueError(f"Unhandled data type {self.typ}")

Generate a value suitable for Kafka serialization.

def value(self, rng: random.Random, in_query: bool = True) -> Any:
216    def value(self, rng: random.Random, in_query: bool = True) -> Any:
217        """Generate a value suitable for SQL queries or COPY operations."""
218        if self.default and rng.randrange(10) == 0 and self.default != "NULL":
219            return str(self.default) if in_query else self.default
220
221        if self.nullable and rng.randrange(10) == 0:
222            return "NULL" if in_query else None
223
224        if self.typ == "boolean":
225            val = rng.random() < 0.2
226            return ("true" if val else "false") if in_query else val
227
228        elif self.typ == "smallint":
229            val = long_tail_int(-32768, 32767, rng=rng)
230            return str(val) if in_query else val
231
232        elif self.typ == "integer":
233            val = long_tail_int(-2147483648, 2147483647, rng=rng)
234            return str(val) if in_query else val
235
236        elif self.typ == "bigint":
237            val = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
238            return str(val) if in_query else val
239
240        elif self.typ == "uint2":
241            val = long_tail_int(0, 65535, rng=rng)
242            return str(val) if in_query else val
243
244        elif self.typ == "uint4":
245            val = long_tail_int(0, 4294967295, rng=rng)
246            return str(val) if in_query else val
247
248        elif self.typ == "uint8":
249            val = long_tail_int(0, 18446744073709551615, rng=rng)
250            return str(val) if in_query else val
251
252        elif self.typ in ("float", "double precision", "numeric"):
253            shaped = self._shaped_float(rng)
254            if shaped is not None:
255                return str(shaped) if in_query else shaped
256            val = long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng)
257            return str(val) if in_query else val
258
259        elif self.typ in ("text", "bytea"):
260            shaped = self._shaped_text(rng)
261            if shaped is not None:
262                return literal(shaped) if in_query else shaped
263            s = long_tail_text(self.chars, 100, self._hot_strings, rng=rng)
264            return literal(s) if in_query else s
265
266        elif self.typ in ("character", "character varying"):
267            shaped = self._shaped_text(rng)
268            if shaped is not None:
269                return literal(shaped) if in_query else shaped
270            s = long_tail_text(self.chars, 10, self._hot_strings, rng=rng)
271            return literal(s) if in_query else s
272
273        elif self.typ == "uuid":
274            u = uuid.UUID(int=rng.getrandbits(128), version=4)
275            return str(u) if in_query else u
276
277        elif self.typ == "jsonb":
278            obj = {
279                f"key{key}": str(long_tail_int(-100, 100, rng=rng)) for key in range(20)
280            }
281            if in_query:
282                return f"'{json.dumps(obj)}'::jsonb"
283            else:
284                return json.dumps(obj)
285
286        elif self.typ in ("timestamp with time zone", "timestamp without time zone"):
287            s = self._random_date(rng)
288            return literal(s) if in_query else s
289
290        elif self.typ == "mz_timestamp":
291            s = self._random_date(rng)
292            return literal(s) if in_query else s
293
294        elif self.typ == "date":
295            s = self._random_date(rng)
296            return literal(s) if in_query else s
297
298        elif self.typ == "time":
299            if rng.random() < 0.8:
300                s = rng.choice(
301                    ["00:00:00.000000", "12:00:00.000000", "23:59:59.000000"]
302                )
303                return literal(s) if in_query else s
304
305            s = (
306                f"{rng.randrange(0, 24)}:{rng.randrange(0, 60)}:{rng.randrange(0, 60)}"
307                f".{rng.randrange(0, 1000000)}"
308            )
309            return literal(s) if in_query else s
310
311        elif self.typ == "int2range":
312            a = long_tail_int(-32768, 32767, rng=rng)
313            b = long_tail_int(-32768, 32767, rng=rng)
314            lo, hi = min(a, b), max(a, b)
315            s = f"[{lo},{hi})"
316            return literal(s) if in_query else s
317
318        elif self.typ == "int4range":
319            a = long_tail_int(-2147483648, 2147483647, rng=rng)
320            b = long_tail_int(-2147483648, 2147483647, rng=rng)
321            lo, hi = min(a, b), max(a, b)
322            s = f"[{lo},{hi})"
323            return literal(s) if in_query else s
324
325        elif self.typ == "int8range":
326            a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
327            b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng)
328            lo, hi = min(a, b), max(a, b)
329            s = f"[{lo},{hi})"
330            return literal(s) if in_query else s
331
332        elif self.typ == "map":
333            if in_query:
334                values = [
335                    f"'{i}' => {str(long_tail_int(-100, 100, rng=rng))}"
336                    for i in range(0, 20)
337                ]
338                return literal(f"{{{', '.join(values)}}}")
339            else:
340                # COPY text input for map expects the literal form too
341                values = [
342                    f'"{i}"=>"{str(long_tail_int(-100, 100, rng=rng))}"'
343                    for i in range(0, 20)
344                ]
345                return "{" + ",".join(values) + "}"
346
347        elif self.typ == "text[]":
348            if in_query:
349                values = [
350                    literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng))
351                    for _ in range(5)
352                ]
353                return literal(f"{{{', '.join(values)}}}")
354            else:
355                return [
356                    long_tail_text(self.chars, 100, self._hot_strings, rng=rng)
357                    for _ in range(5)
358                ]
359
360        else:
361            # Custom data type, or not supported yet
362            return "NULL" if in_query else None

Generate a value suitable for SQL queries or COPY operations.