misc.python.materialize.workload_replay.column
Column class for data generation in workload replay.
1# Copyright Materialize, Inc. and contributors. All rights reserved. 2# 3# Use of this software is governed by the Business Source License 4# included in the LICENSE file at the root of this repository. 5# 6# As of the Change Date specified in that file, in accordance with 7# the Business Source License, use of this software will be governed 8# by the Apache License, Version 2.0. 9 10""" 11Column class for data generation in workload replay. 12""" 13 14from __future__ import annotations 15 16import json 17import random 18import string 19import uuid 20from typing import Any 21 22from pg8000.native import literal 23 24from materialize.workload_replay.util import ( 25 long_tail_float, 26 long_tail_int, 27 long_tail_rank, 28 long_tail_text, 29) 30 31 32class Column: 33 """Represents a column with type information and data generation capabilities.""" 34 35 def __init__( 36 self, name: str, typ: str, nullable: bool, default: Any, data_shape: str | None 37 ): 38 self.name = name 39 self.typ = typ 40 self.nullable = nullable 41 self.default = default 42 self.chars = string.ascii_letters + string.digits 43 self.data_shape = data_shape 44 45 self._years = list(range(2019, 2026)) 46 self._seq_counter = 0 47 48 self._hot_strings = [ 49 f"{name}_a", 50 f"{name}_b", 51 f"{name}_c", 52 "foo", 53 "bar", 54 "baz", 55 "0", 56 "1", 57 "NULL", 58 ] 59 60 def _shaped_text(self, rng: random.Random) -> str | None: 61 """Generate text according to data_shape, or None if not applicable.""" 62 if self.data_shape == "datetime": 63 return self._random_datetime(rng) 64 elif self.data_shape == "random": 65 length = rng.randrange(5, 40) 66 return "".join(rng.choice(self.chars) for _ in range(length)) 67 elif self.data_shape == "uuid": 68 return str(uuid.UUID(int=rng.getrandbits(128), version=4)) 69 elif self.data_shape == "sequential": 70 self._seq_counter += 1 71 return f"{self.name}_{self._seq_counter}" 72 elif self.data_shape == "zipfian": 73 rank = long_tail_rank(n=10000, a=1.3, rng=rng) 74 return f"{self.name}_{rank}" 75 elif self.data_shape is not None and self.data_shape != "duration": 76 raise ValueError(f"Unhandled data_shape {self.data_shape!r}") 77 return None 78 79 def _shaped_float(self, rng: random.Random) -> float | None: 80 """Generate a float according to data_shape, or None if not applicable.""" 81 if self.data_shape == "duration": 82 return round(rng.uniform(10.0, 1800.0), 2) 83 return None 84 85 def _random_date(self, rng: random.Random) -> str: 86 """Generate a uniformly random date string.""" 87 year = rng.choice(self._years) 88 return f"{year}-{rng.randrange(1, 13):02}-{rng.randrange(1, 29):02}" 89 90 def _random_datetime(self, rng: random.Random) -> str: 91 """Generate a uniformly random datetime string.""" 92 return ( 93 f"{self._random_date(rng)}" 94 f"T{rng.randrange(0, 24):02}:{rng.randrange(0, 60):02}:{rng.randrange(0, 60):02}Z" 95 ) 96 97 def avro_type(self) -> str | list[str]: 98 """Return the Avro type for this column.""" 99 result = self.typ 100 if self.typ in ("text", "bytea", "character", "character varying"): 101 result = "string" 102 elif self.typ in ("smallint", "integer", "uint2", "uint4"): 103 result = "int" 104 elif self.typ in ("bigint", "uint8"): 105 result = "long" 106 elif self.typ in ("double precision", "numeric"): 107 result = "double" 108 elif self.typ in ("timestamp with time zone", "timestamp without time zone"): 109 result = "long" 110 return ["null", result] if self.nullable else result 111 112 def kafka_value(self, rng: random.Random) -> Any: 113 """Generate a value suitable for Kafka serialization.""" 114 if self.default and rng.randrange(10) == 0 and self.default != "NULL": 115 return str(self.default) 116 if self.nullable and rng.randrange(10) == 0: 117 return None 118 119 if self.typ == "boolean": 120 return rng.random() < 0.2 121 122 elif self.typ == "smallint": 123 return long_tail_int(-32768, 32767, rng=rng) 124 elif self.typ == "integer": 125 return long_tail_int(-2147483648, 2147483647, rng=rng) 126 elif self.typ == "bigint": 127 return long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 128 129 elif self.typ == "uint2": 130 return long_tail_int(0, 65535, rng=rng) 131 elif self.typ == "uint4": 132 return long_tail_int(0, 4294967295, rng=rng) 133 elif self.typ == "uint8": 134 return long_tail_int(0, 18446744073709551615, rng=rng) 135 136 elif self.typ in ("float", "double precision", "numeric"): 137 shaped = self._shaped_float(rng) 138 if shaped is not None: 139 return shaped 140 return long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng) 141 142 elif self.typ in ("text", "bytea"): 143 shaped = self._shaped_text(rng) 144 if shaped is not None: 145 return literal(shaped) 146 return literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng)) 147 148 elif self.typ in ("character", "character varying"): 149 shaped = self._shaped_text(rng) 150 if shaped is not None: 151 return literal(shaped) 152 return literal(long_tail_text(self.chars, 10, self._hot_strings, rng=rng)) 153 154 elif self.typ == "uuid": 155 return str(uuid.UUID(int=rng.getrandbits(128), version=4)) 156 157 elif self.typ == "jsonb": 158 result = { 159 f"key{key}": str(long_tail_int(-100, 100, rng=rng)) for key in range(20) 160 } 161 return json.dumps(result) 162 163 elif self.typ in ("timestamp with time zone", "timestamp without time zone"): 164 # Epoch millis spread uniformly across 2019–2025 165 # 2019-01-01 = 1546300800000, 2026-01-01 = 1767225600000 166 return rng.randrange(1546300800000, 1767225600000) 167 168 elif self.typ == "mz_timestamp": 169 return literal(self._random_date(rng)) 170 171 elif self.typ == "date": 172 return literal(self._random_date(rng)) 173 174 elif self.typ == "time": 175 if rng.random() < 0.8: 176 common = ["00:00:00.000000", "12:00:00.000000", "23:59:59.000000"] 177 return literal(rng.choice(common)) 178 return literal( 179 f"{rng.randrange(0, 24)}:{rng.randrange(0, 60)}:{rng.randrange(0, 60)}.{rng.randrange(0, 1000000)}" 180 ) 181 182 elif self.typ == "int2range": 183 a = long_tail_int(-32768, 32767, rng=rng) 184 b = long_tail_int(-32768, 32767, rng=rng) 185 lo, hi = min(a, b), max(a, b) 186 return literal(f"[{lo},{hi})") 187 188 elif self.typ == "int4range": 189 a = long_tail_int(-2147483648, 2147483647, rng=rng) 190 b = long_tail_int(-2147483648, 2147483647, rng=rng) 191 lo, hi = min(a, b), max(a, b) 192 return literal(f"[{lo},{hi})") 193 194 elif self.typ == "int8range": 195 a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 196 b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 197 lo, hi = min(a, b), max(a, b) 198 return literal(f"[{lo},{hi})") 199 200 elif self.typ == "map": 201 return { 202 str(i): str(long_tail_int(-100, 100, rng=rng)) for i in range(0, 20) 203 } 204 205 elif self.typ == "text[]": 206 values = [ 207 literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng)) 208 for _ in range(5) 209 ] 210 return literal(f"{{{', '.join(values)}}}") 211 212 else: 213 raise ValueError(f"Unhandled data type {self.typ}") 214 215 def value(self, rng: random.Random, in_query: bool = True) -> Any: 216 """Generate a value suitable for SQL queries or COPY operations.""" 217 if self.default and rng.randrange(10) == 0 and self.default != "NULL": 218 return str(self.default) if in_query else self.default 219 220 if self.nullable and rng.randrange(10) == 0: 221 return "NULL" if in_query else None 222 223 if self.typ == "boolean": 224 val = rng.random() < 0.2 225 return ("true" if val else "false") if in_query else val 226 227 elif self.typ == "smallint": 228 val = long_tail_int(-32768, 32767, rng=rng) 229 return str(val) if in_query else val 230 231 elif self.typ == "integer": 232 val = long_tail_int(-2147483648, 2147483647, rng=rng) 233 return str(val) if in_query else val 234 235 elif self.typ == "bigint": 236 val = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 237 return str(val) if in_query else val 238 239 elif self.typ == "uint2": 240 val = long_tail_int(0, 65535, rng=rng) 241 return str(val) if in_query else val 242 243 elif self.typ == "uint4": 244 val = long_tail_int(0, 4294967295, rng=rng) 245 return str(val) if in_query else val 246 247 elif self.typ == "uint8": 248 val = long_tail_int(0, 18446744073709551615, rng=rng) 249 return str(val) if in_query else val 250 251 elif self.typ in ("float", "double precision", "numeric"): 252 shaped = self._shaped_float(rng) 253 if shaped is not None: 254 return str(shaped) if in_query else shaped 255 val = long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng) 256 return str(val) if in_query else val 257 258 elif self.typ in ("text", "bytea"): 259 shaped = self._shaped_text(rng) 260 if shaped is not None: 261 return literal(shaped) if in_query else shaped 262 s = long_tail_text(self.chars, 100, self._hot_strings, rng=rng) 263 return literal(s) if in_query else s 264 265 elif self.typ in ("character", "character varying"): 266 shaped = self._shaped_text(rng) 267 if shaped is not None: 268 return literal(shaped) if in_query else shaped 269 s = long_tail_text(self.chars, 10, self._hot_strings, rng=rng) 270 return literal(s) if in_query else s 271 272 elif self.typ == "uuid": 273 u = uuid.UUID(int=rng.getrandbits(128), version=4) 274 return str(u) if in_query else u 275 276 elif self.typ == "jsonb": 277 obj = { 278 f"key{key}": str(long_tail_int(-100, 100, rng=rng)) for key in range(20) 279 } 280 if in_query: 281 return f"'{json.dumps(obj)}'::jsonb" 282 else: 283 return json.dumps(obj) 284 285 elif self.typ in ("timestamp with time zone", "timestamp without time zone"): 286 s = self._random_date(rng) 287 return literal(s) if in_query else s 288 289 elif self.typ == "mz_timestamp": 290 s = self._random_date(rng) 291 return literal(s) if in_query else s 292 293 elif self.typ == "date": 294 s = self._random_date(rng) 295 return literal(s) if in_query else s 296 297 elif self.typ == "time": 298 if rng.random() < 0.8: 299 s = rng.choice( 300 ["00:00:00.000000", "12:00:00.000000", "23:59:59.000000"] 301 ) 302 return literal(s) if in_query else s 303 304 s = ( 305 f"{rng.randrange(0, 24)}:{rng.randrange(0, 60)}:{rng.randrange(0, 60)}" 306 f".{rng.randrange(0, 1000000)}" 307 ) 308 return literal(s) if in_query else s 309 310 elif self.typ == "int2range": 311 a = long_tail_int(-32768, 32767, rng=rng) 312 b = long_tail_int(-32768, 32767, rng=rng) 313 lo, hi = min(a, b), max(a, b) 314 s = f"[{lo},{hi})" 315 return literal(s) if in_query else s 316 317 elif self.typ == "int4range": 318 a = long_tail_int(-2147483648, 2147483647, rng=rng) 319 b = long_tail_int(-2147483648, 2147483647, rng=rng) 320 lo, hi = min(a, b), max(a, b) 321 s = f"[{lo},{hi})" 322 return literal(s) if in_query else s 323 324 elif self.typ == "int8range": 325 a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 326 b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 327 lo, hi = min(a, b), max(a, b) 328 s = f"[{lo},{hi})" 329 return literal(s) if in_query else s 330 331 elif self.typ == "map": 332 if in_query: 333 values = [ 334 f"'{i}' => {str(long_tail_int(-100, 100, rng=rng))}" 335 for i in range(0, 20) 336 ] 337 return literal(f"{{{', '.join(values)}}}") 338 else: 339 # COPY text input for map expects the literal form too 340 values = [ 341 f'"{i}"=>"{str(long_tail_int(-100, 100, rng=rng))}"' 342 for i in range(0, 20) 343 ] 344 return "{" + ",".join(values) + "}" 345 346 elif self.typ == "text[]": 347 if in_query: 348 values = [ 349 literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng)) 350 for _ in range(5) 351 ] 352 return literal(f"{{{', '.join(values)}}}") 353 else: 354 return [ 355 long_tail_text(self.chars, 100, self._hot_strings, rng=rng) 356 for _ in range(5) 357 ] 358 359 else: 360 # Custom data type, or not supported yet 361 return "NULL" if in_query else None
class
Column:
33class Column: 34 """Represents a column with type information and data generation capabilities.""" 35 36 def __init__( 37 self, name: str, typ: str, nullable: bool, default: Any, data_shape: str | None 38 ): 39 self.name = name 40 self.typ = typ 41 self.nullable = nullable 42 self.default = default 43 self.chars = string.ascii_letters + string.digits 44 self.data_shape = data_shape 45 46 self._years = list(range(2019, 2026)) 47 self._seq_counter = 0 48 49 self._hot_strings = [ 50 f"{name}_a", 51 f"{name}_b", 52 f"{name}_c", 53 "foo", 54 "bar", 55 "baz", 56 "0", 57 "1", 58 "NULL", 59 ] 60 61 def _shaped_text(self, rng: random.Random) -> str | None: 62 """Generate text according to data_shape, or None if not applicable.""" 63 if self.data_shape == "datetime": 64 return self._random_datetime(rng) 65 elif self.data_shape == "random": 66 length = rng.randrange(5, 40) 67 return "".join(rng.choice(self.chars) for _ in range(length)) 68 elif self.data_shape == "uuid": 69 return str(uuid.UUID(int=rng.getrandbits(128), version=4)) 70 elif self.data_shape == "sequential": 71 self._seq_counter += 1 72 return f"{self.name}_{self._seq_counter}" 73 elif self.data_shape == "zipfian": 74 rank = long_tail_rank(n=10000, a=1.3, rng=rng) 75 return f"{self.name}_{rank}" 76 elif self.data_shape is not None and self.data_shape != "duration": 77 raise ValueError(f"Unhandled data_shape {self.data_shape!r}") 78 return None 79 80 def _shaped_float(self, rng: random.Random) -> float | None: 81 """Generate a float according to data_shape, or None if not applicable.""" 82 if self.data_shape == "duration": 83 return round(rng.uniform(10.0, 1800.0), 2) 84 return None 85 86 def _random_date(self, rng: random.Random) -> str: 87 """Generate a uniformly random date string.""" 88 year = rng.choice(self._years) 89 return f"{year}-{rng.randrange(1, 13):02}-{rng.randrange(1, 29):02}" 90 91 def _random_datetime(self, rng: random.Random) -> str: 92 """Generate a uniformly random datetime string.""" 93 return ( 94 f"{self._random_date(rng)}" 95 f"T{rng.randrange(0, 24):02}:{rng.randrange(0, 60):02}:{rng.randrange(0, 60):02}Z" 96 ) 97 98 def avro_type(self) -> str | list[str]: 99 """Return the Avro type for this column.""" 100 result = self.typ 101 if self.typ in ("text", "bytea", "character", "character varying"): 102 result = "string" 103 elif self.typ in ("smallint", "integer", "uint2", "uint4"): 104 result = "int" 105 elif self.typ in ("bigint", "uint8"): 106 result = "long" 107 elif self.typ in ("double precision", "numeric"): 108 result = "double" 109 elif self.typ in ("timestamp with time zone", "timestamp without time zone"): 110 result = "long" 111 return ["null", result] if self.nullable else result 112 113 def kafka_value(self, rng: random.Random) -> Any: 114 """Generate a value suitable for Kafka serialization.""" 115 if self.default and rng.randrange(10) == 0 and self.default != "NULL": 116 return str(self.default) 117 if self.nullable and rng.randrange(10) == 0: 118 return None 119 120 if self.typ == "boolean": 121 return rng.random() < 0.2 122 123 elif self.typ == "smallint": 124 return long_tail_int(-32768, 32767, rng=rng) 125 elif self.typ == "integer": 126 return long_tail_int(-2147483648, 2147483647, rng=rng) 127 elif self.typ == "bigint": 128 return long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 129 130 elif self.typ == "uint2": 131 return long_tail_int(0, 65535, rng=rng) 132 elif self.typ == "uint4": 133 return long_tail_int(0, 4294967295, rng=rng) 134 elif self.typ == "uint8": 135 return long_tail_int(0, 18446744073709551615, rng=rng) 136 137 elif self.typ in ("float", "double precision", "numeric"): 138 shaped = self._shaped_float(rng) 139 if shaped is not None: 140 return shaped 141 return long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng) 142 143 elif self.typ in ("text", "bytea"): 144 shaped = self._shaped_text(rng) 145 if shaped is not None: 146 return literal(shaped) 147 return literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng)) 148 149 elif self.typ in ("character", "character varying"): 150 shaped = self._shaped_text(rng) 151 if shaped is not None: 152 return literal(shaped) 153 return literal(long_tail_text(self.chars, 10, self._hot_strings, rng=rng)) 154 155 elif self.typ == "uuid": 156 return str(uuid.UUID(int=rng.getrandbits(128), version=4)) 157 158 elif self.typ == "jsonb": 159 result = { 160 f"key{key}": str(long_tail_int(-100, 100, rng=rng)) for key in range(20) 161 } 162 return json.dumps(result) 163 164 elif self.typ in ("timestamp with time zone", "timestamp without time zone"): 165 # Epoch millis spread uniformly across 2019–2025 166 # 2019-01-01 = 1546300800000, 2026-01-01 = 1767225600000 167 return rng.randrange(1546300800000, 1767225600000) 168 169 elif self.typ == "mz_timestamp": 170 return literal(self._random_date(rng)) 171 172 elif self.typ == "date": 173 return literal(self._random_date(rng)) 174 175 elif self.typ == "time": 176 if rng.random() < 0.8: 177 common = ["00:00:00.000000", "12:00:00.000000", "23:59:59.000000"] 178 return literal(rng.choice(common)) 179 return literal( 180 f"{rng.randrange(0, 24)}:{rng.randrange(0, 60)}:{rng.randrange(0, 60)}.{rng.randrange(0, 1000000)}" 181 ) 182 183 elif self.typ == "int2range": 184 a = long_tail_int(-32768, 32767, rng=rng) 185 b = long_tail_int(-32768, 32767, rng=rng) 186 lo, hi = min(a, b), max(a, b) 187 return literal(f"[{lo},{hi})") 188 189 elif self.typ == "int4range": 190 a = long_tail_int(-2147483648, 2147483647, rng=rng) 191 b = long_tail_int(-2147483648, 2147483647, rng=rng) 192 lo, hi = min(a, b), max(a, b) 193 return literal(f"[{lo},{hi})") 194 195 elif self.typ == "int8range": 196 a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 197 b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 198 lo, hi = min(a, b), max(a, b) 199 return literal(f"[{lo},{hi})") 200 201 elif self.typ == "map": 202 return { 203 str(i): str(long_tail_int(-100, 100, rng=rng)) for i in range(0, 20) 204 } 205 206 elif self.typ == "text[]": 207 values = [ 208 literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng)) 209 for _ in range(5) 210 ] 211 return literal(f"{{{', '.join(values)}}}") 212 213 else: 214 raise ValueError(f"Unhandled data type {self.typ}") 215 216 def value(self, rng: random.Random, in_query: bool = True) -> Any: 217 """Generate a value suitable for SQL queries or COPY operations.""" 218 if self.default and rng.randrange(10) == 0 and self.default != "NULL": 219 return str(self.default) if in_query else self.default 220 221 if self.nullable and rng.randrange(10) == 0: 222 return "NULL" if in_query else None 223 224 if self.typ == "boolean": 225 val = rng.random() < 0.2 226 return ("true" if val else "false") if in_query else val 227 228 elif self.typ == "smallint": 229 val = long_tail_int(-32768, 32767, rng=rng) 230 return str(val) if in_query else val 231 232 elif self.typ == "integer": 233 val = long_tail_int(-2147483648, 2147483647, rng=rng) 234 return str(val) if in_query else val 235 236 elif self.typ == "bigint": 237 val = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 238 return str(val) if in_query else val 239 240 elif self.typ == "uint2": 241 val = long_tail_int(0, 65535, rng=rng) 242 return str(val) if in_query else val 243 244 elif self.typ == "uint4": 245 val = long_tail_int(0, 4294967295, rng=rng) 246 return str(val) if in_query else val 247 248 elif self.typ == "uint8": 249 val = long_tail_int(0, 18446744073709551615, rng=rng) 250 return str(val) if in_query else val 251 252 elif self.typ in ("float", "double precision", "numeric"): 253 shaped = self._shaped_float(rng) 254 if shaped is not None: 255 return str(shaped) if in_query else shaped 256 val = long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng) 257 return str(val) if in_query else val 258 259 elif self.typ in ("text", "bytea"): 260 shaped = self._shaped_text(rng) 261 if shaped is not None: 262 return literal(shaped) if in_query else shaped 263 s = long_tail_text(self.chars, 100, self._hot_strings, rng=rng) 264 return literal(s) if in_query else s 265 266 elif self.typ in ("character", "character varying"): 267 shaped = self._shaped_text(rng) 268 if shaped is not None: 269 return literal(shaped) if in_query else shaped 270 s = long_tail_text(self.chars, 10, self._hot_strings, rng=rng) 271 return literal(s) if in_query else s 272 273 elif self.typ == "uuid": 274 u = uuid.UUID(int=rng.getrandbits(128), version=4) 275 return str(u) if in_query else u 276 277 elif self.typ == "jsonb": 278 obj = { 279 f"key{key}": str(long_tail_int(-100, 100, rng=rng)) for key in range(20) 280 } 281 if in_query: 282 return f"'{json.dumps(obj)}'::jsonb" 283 else: 284 return json.dumps(obj) 285 286 elif self.typ in ("timestamp with time zone", "timestamp without time zone"): 287 s = self._random_date(rng) 288 return literal(s) if in_query else s 289 290 elif self.typ == "mz_timestamp": 291 s = self._random_date(rng) 292 return literal(s) if in_query else s 293 294 elif self.typ == "date": 295 s = self._random_date(rng) 296 return literal(s) if in_query else s 297 298 elif self.typ == "time": 299 if rng.random() < 0.8: 300 s = rng.choice( 301 ["00:00:00.000000", "12:00:00.000000", "23:59:59.000000"] 302 ) 303 return literal(s) if in_query else s 304 305 s = ( 306 f"{rng.randrange(0, 24)}:{rng.randrange(0, 60)}:{rng.randrange(0, 60)}" 307 f".{rng.randrange(0, 1000000)}" 308 ) 309 return literal(s) if in_query else s 310 311 elif self.typ == "int2range": 312 a = long_tail_int(-32768, 32767, rng=rng) 313 b = long_tail_int(-32768, 32767, rng=rng) 314 lo, hi = min(a, b), max(a, b) 315 s = f"[{lo},{hi})" 316 return literal(s) if in_query else s 317 318 elif self.typ == "int4range": 319 a = long_tail_int(-2147483648, 2147483647, rng=rng) 320 b = long_tail_int(-2147483648, 2147483647, rng=rng) 321 lo, hi = min(a, b), max(a, b) 322 s = f"[{lo},{hi})" 323 return literal(s) if in_query else s 324 325 elif self.typ == "int8range": 326 a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 327 b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 328 lo, hi = min(a, b), max(a, b) 329 s = f"[{lo},{hi})" 330 return literal(s) if in_query else s 331 332 elif self.typ == "map": 333 if in_query: 334 values = [ 335 f"'{i}' => {str(long_tail_int(-100, 100, rng=rng))}" 336 for i in range(0, 20) 337 ] 338 return literal(f"{{{', '.join(values)}}}") 339 else: 340 # COPY text input for map expects the literal form too 341 values = [ 342 f'"{i}"=>"{str(long_tail_int(-100, 100, rng=rng))}"' 343 for i in range(0, 20) 344 ] 345 return "{" + ",".join(values) + "}" 346 347 elif self.typ == "text[]": 348 if in_query: 349 values = [ 350 literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng)) 351 for _ in range(5) 352 ] 353 return literal(f"{{{', '.join(values)}}}") 354 else: 355 return [ 356 long_tail_text(self.chars, 100, self._hot_strings, rng=rng) 357 for _ in range(5) 358 ] 359 360 else: 361 # Custom data type, or not supported yet 362 return "NULL" if in_query else None
Represents a column with type information and data generation capabilities.
Column( name: str, typ: str, nullable: bool, default: Any, data_shape: str | None)
36 def __init__( 37 self, name: str, typ: str, nullable: bool, default: Any, data_shape: str | None 38 ): 39 self.name = name 40 self.typ = typ 41 self.nullable = nullable 42 self.default = default 43 self.chars = string.ascii_letters + string.digits 44 self.data_shape = data_shape 45 46 self._years = list(range(2019, 2026)) 47 self._seq_counter = 0 48 49 self._hot_strings = [ 50 f"{name}_a", 51 f"{name}_b", 52 f"{name}_c", 53 "foo", 54 "bar", 55 "baz", 56 "0", 57 "1", 58 "NULL", 59 ]
def
avro_type(self) -> str | list[str]:
98 def avro_type(self) -> str | list[str]: 99 """Return the Avro type for this column.""" 100 result = self.typ 101 if self.typ in ("text", "bytea", "character", "character varying"): 102 result = "string" 103 elif self.typ in ("smallint", "integer", "uint2", "uint4"): 104 result = "int" 105 elif self.typ in ("bigint", "uint8"): 106 result = "long" 107 elif self.typ in ("double precision", "numeric"): 108 result = "double" 109 elif self.typ in ("timestamp with time zone", "timestamp without time zone"): 110 result = "long" 111 return ["null", result] if self.nullable else result
Return the Avro type for this column.
def
kafka_value(self, rng: random.Random) -> Any:
113 def kafka_value(self, rng: random.Random) -> Any: 114 """Generate a value suitable for Kafka serialization.""" 115 if self.default and rng.randrange(10) == 0 and self.default != "NULL": 116 return str(self.default) 117 if self.nullable and rng.randrange(10) == 0: 118 return None 119 120 if self.typ == "boolean": 121 return rng.random() < 0.2 122 123 elif self.typ == "smallint": 124 return long_tail_int(-32768, 32767, rng=rng) 125 elif self.typ == "integer": 126 return long_tail_int(-2147483648, 2147483647, rng=rng) 127 elif self.typ == "bigint": 128 return long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 129 130 elif self.typ == "uint2": 131 return long_tail_int(0, 65535, rng=rng) 132 elif self.typ == "uint4": 133 return long_tail_int(0, 4294967295, rng=rng) 134 elif self.typ == "uint8": 135 return long_tail_int(0, 18446744073709551615, rng=rng) 136 137 elif self.typ in ("float", "double precision", "numeric"): 138 shaped = self._shaped_float(rng) 139 if shaped is not None: 140 return shaped 141 return long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng) 142 143 elif self.typ in ("text", "bytea"): 144 shaped = self._shaped_text(rng) 145 if shaped is not None: 146 return literal(shaped) 147 return literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng)) 148 149 elif self.typ in ("character", "character varying"): 150 shaped = self._shaped_text(rng) 151 if shaped is not None: 152 return literal(shaped) 153 return literal(long_tail_text(self.chars, 10, self._hot_strings, rng=rng)) 154 155 elif self.typ == "uuid": 156 return str(uuid.UUID(int=rng.getrandbits(128), version=4)) 157 158 elif self.typ == "jsonb": 159 result = { 160 f"key{key}": str(long_tail_int(-100, 100, rng=rng)) for key in range(20) 161 } 162 return json.dumps(result) 163 164 elif self.typ in ("timestamp with time zone", "timestamp without time zone"): 165 # Epoch millis spread uniformly across 2019–2025 166 # 2019-01-01 = 1546300800000, 2026-01-01 = 1767225600000 167 return rng.randrange(1546300800000, 1767225600000) 168 169 elif self.typ == "mz_timestamp": 170 return literal(self._random_date(rng)) 171 172 elif self.typ == "date": 173 return literal(self._random_date(rng)) 174 175 elif self.typ == "time": 176 if rng.random() < 0.8: 177 common = ["00:00:00.000000", "12:00:00.000000", "23:59:59.000000"] 178 return literal(rng.choice(common)) 179 return literal( 180 f"{rng.randrange(0, 24)}:{rng.randrange(0, 60)}:{rng.randrange(0, 60)}.{rng.randrange(0, 1000000)}" 181 ) 182 183 elif self.typ == "int2range": 184 a = long_tail_int(-32768, 32767, rng=rng) 185 b = long_tail_int(-32768, 32767, rng=rng) 186 lo, hi = min(a, b), max(a, b) 187 return literal(f"[{lo},{hi})") 188 189 elif self.typ == "int4range": 190 a = long_tail_int(-2147483648, 2147483647, rng=rng) 191 b = long_tail_int(-2147483648, 2147483647, rng=rng) 192 lo, hi = min(a, b), max(a, b) 193 return literal(f"[{lo},{hi})") 194 195 elif self.typ == "int8range": 196 a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 197 b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 198 lo, hi = min(a, b), max(a, b) 199 return literal(f"[{lo},{hi})") 200 201 elif self.typ == "map": 202 return { 203 str(i): str(long_tail_int(-100, 100, rng=rng)) for i in range(0, 20) 204 } 205 206 elif self.typ == "text[]": 207 values = [ 208 literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng)) 209 for _ in range(5) 210 ] 211 return literal(f"{{{', '.join(values)}}}") 212 213 else: 214 raise ValueError(f"Unhandled data type {self.typ}")
Generate a value suitable for Kafka serialization.
def
value(self, rng: random.Random, in_query: bool = True) -> Any:
216 def value(self, rng: random.Random, in_query: bool = True) -> Any: 217 """Generate a value suitable for SQL queries or COPY operations.""" 218 if self.default and rng.randrange(10) == 0 and self.default != "NULL": 219 return str(self.default) if in_query else self.default 220 221 if self.nullable and rng.randrange(10) == 0: 222 return "NULL" if in_query else None 223 224 if self.typ == "boolean": 225 val = rng.random() < 0.2 226 return ("true" if val else "false") if in_query else val 227 228 elif self.typ == "smallint": 229 val = long_tail_int(-32768, 32767, rng=rng) 230 return str(val) if in_query else val 231 232 elif self.typ == "integer": 233 val = long_tail_int(-2147483648, 2147483647, rng=rng) 234 return str(val) if in_query else val 235 236 elif self.typ == "bigint": 237 val = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 238 return str(val) if in_query else val 239 240 elif self.typ == "uint2": 241 val = long_tail_int(0, 65535, rng=rng) 242 return str(val) if in_query else val 243 244 elif self.typ == "uint4": 245 val = long_tail_int(0, 4294967295, rng=rng) 246 return str(val) if in_query else val 247 248 elif self.typ == "uint8": 249 val = long_tail_int(0, 18446744073709551615, rng=rng) 250 return str(val) if in_query else val 251 252 elif self.typ in ("float", "double precision", "numeric"): 253 shaped = self._shaped_float(rng) 254 if shaped is not None: 255 return str(shaped) if in_query else shaped 256 val = long_tail_float(-1_000_000_000.0, 1_000_000_000.0, rng=rng) 257 return str(val) if in_query else val 258 259 elif self.typ in ("text", "bytea"): 260 shaped = self._shaped_text(rng) 261 if shaped is not None: 262 return literal(shaped) if in_query else shaped 263 s = long_tail_text(self.chars, 100, self._hot_strings, rng=rng) 264 return literal(s) if in_query else s 265 266 elif self.typ in ("character", "character varying"): 267 shaped = self._shaped_text(rng) 268 if shaped is not None: 269 return literal(shaped) if in_query else shaped 270 s = long_tail_text(self.chars, 10, self._hot_strings, rng=rng) 271 return literal(s) if in_query else s 272 273 elif self.typ == "uuid": 274 u = uuid.UUID(int=rng.getrandbits(128), version=4) 275 return str(u) if in_query else u 276 277 elif self.typ == "jsonb": 278 obj = { 279 f"key{key}": str(long_tail_int(-100, 100, rng=rng)) for key in range(20) 280 } 281 if in_query: 282 return f"'{json.dumps(obj)}'::jsonb" 283 else: 284 return json.dumps(obj) 285 286 elif self.typ in ("timestamp with time zone", "timestamp without time zone"): 287 s = self._random_date(rng) 288 return literal(s) if in_query else s 289 290 elif self.typ == "mz_timestamp": 291 s = self._random_date(rng) 292 return literal(s) if in_query else s 293 294 elif self.typ == "date": 295 s = self._random_date(rng) 296 return literal(s) if in_query else s 297 298 elif self.typ == "time": 299 if rng.random() < 0.8: 300 s = rng.choice( 301 ["00:00:00.000000", "12:00:00.000000", "23:59:59.000000"] 302 ) 303 return literal(s) if in_query else s 304 305 s = ( 306 f"{rng.randrange(0, 24)}:{rng.randrange(0, 60)}:{rng.randrange(0, 60)}" 307 f".{rng.randrange(0, 1000000)}" 308 ) 309 return literal(s) if in_query else s 310 311 elif self.typ == "int2range": 312 a = long_tail_int(-32768, 32767, rng=rng) 313 b = long_tail_int(-32768, 32767, rng=rng) 314 lo, hi = min(a, b), max(a, b) 315 s = f"[{lo},{hi})" 316 return literal(s) if in_query else s 317 318 elif self.typ == "int4range": 319 a = long_tail_int(-2147483648, 2147483647, rng=rng) 320 b = long_tail_int(-2147483648, 2147483647, rng=rng) 321 lo, hi = min(a, b), max(a, b) 322 s = f"[{lo},{hi})" 323 return literal(s) if in_query else s 324 325 elif self.typ == "int8range": 326 a = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 327 b = long_tail_int(-9223372036854775808, 9223372036854775807, rng=rng) 328 lo, hi = min(a, b), max(a, b) 329 s = f"[{lo},{hi})" 330 return literal(s) if in_query else s 331 332 elif self.typ == "map": 333 if in_query: 334 values = [ 335 f"'{i}' => {str(long_tail_int(-100, 100, rng=rng))}" 336 for i in range(0, 20) 337 ] 338 return literal(f"{{{', '.join(values)}}}") 339 else: 340 # COPY text input for map expects the literal form too 341 values = [ 342 f'"{i}"=>"{str(long_tail_int(-100, 100, rng=rng))}"' 343 for i in range(0, 20) 344 ] 345 return "{" + ",".join(values) + "}" 346 347 elif self.typ == "text[]": 348 if in_query: 349 values = [ 350 literal(long_tail_text(self.chars, 100, self._hot_strings, rng=rng)) 351 for _ in range(5) 352 ] 353 return literal(f"{{{', '.join(values)}}}") 354 else: 355 return [ 356 long_tail_text(self.chars, 100, self._hot_strings, rng=rng) 357 for _ in range(5) 358 ] 359 360 else: 361 # Custom data type, or not supported yet 362 return "NULL" if in_query else None
Generate a value suitable for SQL queries or COPY operations.