1use std::sync::LazyLock;
13use std::time::Duration;
14
15use mz_ore::now::NowFn;
16use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
17use mz_repr::adt::numeric::NumericMaxScale;
18use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Row, ScalarType};
19use proptest_derive::Arbitrary;
20use serde::{Deserialize, Serialize};
21use std::collections::BTreeSet;
22
23use crate::AlterCompatible;
24use crate::sources::AlterError;
25use crate::sources::{MzOffset, SourceConnection};
26
27use super::SourceExportDetails;
28
29include!(concat!(
30 env!("OUT_DIR"),
31 "/mz_storage_types.sources.load_generator.rs"
32));
33
34pub const LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT: &str = "offset";
35
36#[derive(Debug)]
38pub enum Event<F: IntoIterator, D> {
39 Progress(F),
41 Message(F::Item, D),
43}
44
45#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
46pub struct LoadGeneratorSourceConnection {
47 pub load_generator: LoadGenerator,
48 pub tick_micros: Option<u64>,
49 pub as_of: u64,
50 pub up_to: u64,
51}
52
53pub static LOAD_GEN_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
54 RelationDesc::builder()
55 .with_column("offset", ScalarType::UInt64.nullable(true))
56 .finish()
57});
58
59impl SourceConnection for LoadGeneratorSourceConnection {
60 fn name(&self) -> &'static str {
61 "load-generator"
62 }
63
64 fn external_reference(&self) -> Option<&str> {
65 None
66 }
67
68 fn default_key_desc(&self) -> RelationDesc {
69 match &self.load_generator {
70 LoadGenerator::KeyValue(_) => {
71 RelationDesc::builder()
73 .with_column("key", ScalarType::UInt64.nullable(false))
74 .finish()
75 }
76 _ => RelationDesc::empty(),
77 }
78 }
79
80 fn default_value_desc(&self) -> RelationDesc {
81 match &self.load_generator {
82 LoadGenerator::Auction => RelationDesc::empty(),
83 LoadGenerator::Clock => RelationDesc::builder()
84 .with_column(
85 "time",
86 ScalarType::TimestampTz { precision: None }.nullable(false),
87 )
88 .finish(),
89 LoadGenerator::Datums => {
90 let mut desc =
91 RelationDesc::builder().with_column("rowid", ScalarType::Int64.nullable(false));
92 let typs = ScalarType::enumerate();
93 let mut names = BTreeSet::new();
94 for typ in typs {
95 let mut name = format!("_{:?}", typ)
97 .split(' ')
98 .next()
99 .unwrap()
100 .to_lowercase();
101 while names.contains(&name) {
104 name.push('_');
105 }
106 names.insert(name.clone());
107 desc = desc.with_column(name, typ.clone().nullable(true));
108 }
109 desc.finish()
110 }
111 LoadGenerator::Counter { .. } => RelationDesc::builder()
112 .with_column("counter", ScalarType::Int64.nullable(false))
113 .finish(),
114 LoadGenerator::Marketing => RelationDesc::empty(),
115 LoadGenerator::Tpch { .. } => RelationDesc::empty(),
116 LoadGenerator::KeyValue(KeyValueLoadGenerator { include_offset, .. }) => {
117 let mut desc = RelationDesc::builder()
118 .with_column("partition", ScalarType::UInt64.nullable(false))
119 .with_column("value", ScalarType::Bytes.nullable(false));
120
121 if let Some(offset_name) = include_offset.as_deref() {
122 desc = desc.with_column(offset_name, ScalarType::UInt64.nullable(false));
123 }
124 desc.finish()
125 }
126 }
127 }
128
129 fn timestamp_desc(&self) -> RelationDesc {
130 LOAD_GEN_PROGRESS_DESC.clone()
131 }
132
133 fn connection_id(&self) -> Option<CatalogItemId> {
134 None
135 }
136
137 fn primary_export_details(&self) -> SourceExportDetails {
140 match &self.load_generator {
141 LoadGenerator::Auction => SourceExportDetails::None,
142 LoadGenerator::Clock => {
143 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
144 output: LoadGeneratorOutput::Default,
145 })
146 }
147 LoadGenerator::Datums => {
148 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
149 output: LoadGeneratorOutput::Default,
150 })
151 }
152 LoadGenerator::Counter { .. } => {
153 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
154 output: LoadGeneratorOutput::Default,
155 })
156 }
157 LoadGenerator::Marketing => SourceExportDetails::None,
158 LoadGenerator::Tpch { .. } => SourceExportDetails::None,
159 LoadGenerator::KeyValue(_) => {
160 SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
161 output: LoadGeneratorOutput::Default,
162 })
163 }
164 }
165 }
166
167 fn supports_read_only(&self) -> bool {
168 true
169 }
170
171 fn prefers_single_replica(&self) -> bool {
172 false
173 }
174}
175
176impl crate::AlterCompatible for LoadGeneratorSourceConnection {}
177
178#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
179pub enum LoadGenerator {
180 Auction,
181 Clock,
182 Counter {
183 max_cardinality: Option<u64>,
187 },
188 Datums,
189 Marketing,
190 Tpch {
191 count_supplier: i64,
192 count_part: i64,
193 count_customer: i64,
194 count_orders: i64,
195 count_clerk: i64,
196 },
197 KeyValue(KeyValueLoadGenerator),
198}
199
200pub const LOAD_GENERATOR_DATABASE_NAME: &str = "mz_load_generators";
201
202impl LoadGenerator {
203 pub fn schema_name(&self) -> &'static str {
206 match self {
207 LoadGenerator::Counter { .. } => "counter",
208 LoadGenerator::Clock => "clock",
209 LoadGenerator::Marketing => "marketing",
210 LoadGenerator::Auction => "auction",
211 LoadGenerator::Datums => "datums",
212 LoadGenerator::Tpch { .. } => "tpch",
213 LoadGenerator::KeyValue { .. } => "key_value",
214 }
215 }
216
217 pub fn views(&self) -> Vec<(&str, RelationDesc, LoadGeneratorOutput)> {
219 match self {
220 LoadGenerator::Auction => vec![
221 (
222 "organizations",
223 RelationDesc::builder()
224 .with_column("id", ScalarType::Int64.nullable(false))
225 .with_column("name", ScalarType::String.nullable(false))
226 .with_key(vec![0])
227 .finish(),
228 LoadGeneratorOutput::Auction(AuctionView::Organizations),
229 ),
230 (
231 "users",
232 RelationDesc::builder()
233 .with_column("id", ScalarType::Int64.nullable(false))
234 .with_column("org_id", ScalarType::Int64.nullable(false))
235 .with_column("name", ScalarType::String.nullable(false))
236 .with_key(vec![0])
237 .finish(),
238 LoadGeneratorOutput::Auction(AuctionView::Users),
239 ),
240 (
241 "accounts",
242 RelationDesc::builder()
243 .with_column("id", ScalarType::Int64.nullable(false))
244 .with_column("org_id", ScalarType::Int64.nullable(false))
245 .with_column("balance", ScalarType::Int64.nullable(false))
246 .with_key(vec![0])
247 .finish(),
248 LoadGeneratorOutput::Auction(AuctionView::Accounts),
249 ),
250 (
251 "auctions",
252 RelationDesc::builder()
253 .with_column("id", ScalarType::Int64.nullable(false))
254 .with_column("seller", ScalarType::Int64.nullable(false))
255 .with_column("item", ScalarType::String.nullable(false))
256 .with_column(
257 "end_time",
258 ScalarType::TimestampTz { precision: None }.nullable(false),
259 )
260 .with_key(vec![0])
261 .finish(),
262 LoadGeneratorOutput::Auction(AuctionView::Auctions),
263 ),
264 (
265 "bids",
266 RelationDesc::builder()
267 .with_column("id", ScalarType::Int64.nullable(false))
268 .with_column("buyer", ScalarType::Int64.nullable(false))
269 .with_column("auction_id", ScalarType::Int64.nullable(false))
270 .with_column("amount", ScalarType::Int32.nullable(false))
271 .with_column(
272 "bid_time",
273 ScalarType::TimestampTz { precision: None }.nullable(false),
274 )
275 .with_key(vec![0])
276 .finish(),
277 LoadGeneratorOutput::Auction(AuctionView::Bids),
278 ),
279 ],
280 LoadGenerator::Clock => vec![],
281 LoadGenerator::Counter { max_cardinality: _ } => vec![],
282 LoadGenerator::Marketing => {
283 vec![
284 (
285 "customers",
286 RelationDesc::builder()
287 .with_column("id", ScalarType::Int64.nullable(false))
288 .with_column("email", ScalarType::String.nullable(false))
289 .with_column("income", ScalarType::Int64.nullable(false))
290 .with_key(vec![0])
291 .finish(),
292 LoadGeneratorOutput::Marketing(MarketingView::Customers),
293 ),
294 (
295 "impressions",
296 RelationDesc::builder()
297 .with_column("id", ScalarType::Int64.nullable(false))
298 .with_column("customer_id", ScalarType::Int64.nullable(false))
299 .with_column("campaign_id", ScalarType::Int64.nullable(false))
300 .with_column(
301 "impression_time",
302 ScalarType::TimestampTz { precision: None }.nullable(false),
303 )
304 .with_key(vec![0])
305 .finish(),
306 LoadGeneratorOutput::Marketing(MarketingView::Impressions),
307 ),
308 (
309 "clicks",
310 RelationDesc::builder()
311 .with_column("impression_id", ScalarType::Int64.nullable(false))
312 .with_column(
313 "click_time",
314 ScalarType::TimestampTz { precision: None }.nullable(false),
315 )
316 .finish(),
317 LoadGeneratorOutput::Marketing(MarketingView::Clicks),
318 ),
319 (
320 "leads",
321 RelationDesc::builder()
322 .with_column("id", ScalarType::Int64.nullable(false))
323 .with_column("customer_id", ScalarType::Int64.nullable(false))
324 .with_column(
325 "created_at",
326 ScalarType::TimestampTz { precision: None }.nullable(false),
327 )
328 .with_column(
329 "converted_at",
330 ScalarType::TimestampTz { precision: None }.nullable(true),
331 )
332 .with_column("conversion_amount", ScalarType::Int64.nullable(true))
333 .with_key(vec![0])
334 .finish(),
335 LoadGeneratorOutput::Marketing(MarketingView::Leads),
336 ),
337 (
338 "coupons",
339 RelationDesc::builder()
340 .with_column("id", ScalarType::Int64.nullable(false))
341 .with_column("lead_id", ScalarType::Int64.nullable(false))
342 .with_column(
343 "created_at",
344 ScalarType::TimestampTz { precision: None }.nullable(false),
345 )
346 .with_column("amount", ScalarType::Int64.nullable(false))
347 .with_key(vec![0])
348 .finish(),
349 LoadGeneratorOutput::Marketing(MarketingView::Coupons),
350 ),
351 (
352 "conversion_predictions",
353 RelationDesc::builder()
354 .with_column("lead_id", ScalarType::Int64.nullable(false))
355 .with_column("experiment_bucket", ScalarType::String.nullable(false))
356 .with_column(
357 "predicted_at",
358 ScalarType::TimestampTz { precision: None }.nullable(false),
359 )
360 .with_column("score", ScalarType::Float64.nullable(false))
361 .finish(),
362 LoadGeneratorOutput::Marketing(MarketingView::ConversionPredictions),
363 ),
364 ]
365 }
366 LoadGenerator::Datums => vec![],
367 LoadGenerator::Tpch { .. } => {
368 let identifier = ScalarType::Int64.nullable(false);
369 let decimal = ScalarType::Numeric {
370 max_scale: Some(NumericMaxScale::try_from(2i64).unwrap()),
371 }
372 .nullable(false);
373 vec![
374 (
375 "supplier",
376 RelationDesc::builder()
377 .with_column("s_suppkey", identifier.clone())
378 .with_column("s_name", ScalarType::String.nullable(false))
379 .with_column("s_address", ScalarType::String.nullable(false))
380 .with_column("s_nationkey", identifier.clone())
381 .with_column("s_phone", ScalarType::String.nullable(false))
382 .with_column("s_acctbal", decimal.clone())
383 .with_column("s_comment", ScalarType::String.nullable(false))
384 .with_key(vec![0])
385 .finish(),
386 LoadGeneratorOutput::Tpch(TpchView::Supplier),
387 ),
388 (
389 "part",
390 RelationDesc::builder()
391 .with_column("p_partkey", identifier.clone())
392 .with_column("p_name", ScalarType::String.nullable(false))
393 .with_column("p_mfgr", ScalarType::String.nullable(false))
394 .with_column("p_brand", ScalarType::String.nullable(false))
395 .with_column("p_type", ScalarType::String.nullable(false))
396 .with_column("p_size", ScalarType::Int32.nullable(false))
397 .with_column("p_container", ScalarType::String.nullable(false))
398 .with_column("p_retailprice", decimal.clone())
399 .with_column("p_comment", ScalarType::String.nullable(false))
400 .with_key(vec![0])
401 .finish(),
402 LoadGeneratorOutput::Tpch(TpchView::Part),
403 ),
404 (
405 "partsupp",
406 RelationDesc::builder()
407 .with_column("ps_partkey", identifier.clone())
408 .with_column("ps_suppkey", identifier.clone())
409 .with_column("ps_availqty", ScalarType::Int32.nullable(false))
410 .with_column("ps_supplycost", decimal.clone())
411 .with_column("ps_comment", ScalarType::String.nullable(false))
412 .with_key(vec![0, 1])
413 .finish(),
414 LoadGeneratorOutput::Tpch(TpchView::Partsupp),
415 ),
416 (
417 "customer",
418 RelationDesc::builder()
419 .with_column("c_custkey", identifier.clone())
420 .with_column("c_name", ScalarType::String.nullable(false))
421 .with_column("c_address", ScalarType::String.nullable(false))
422 .with_column("c_nationkey", identifier.clone())
423 .with_column("c_phone", ScalarType::String.nullable(false))
424 .with_column("c_acctbal", decimal.clone())
425 .with_column("c_mktsegment", ScalarType::String.nullable(false))
426 .with_column("c_comment", ScalarType::String.nullable(false))
427 .with_key(vec![0])
428 .finish(),
429 LoadGeneratorOutput::Tpch(TpchView::Customer),
430 ),
431 (
432 "orders",
433 RelationDesc::builder()
434 .with_column("o_orderkey", identifier.clone())
435 .with_column("o_custkey", identifier.clone())
436 .with_column("o_orderstatus", ScalarType::String.nullable(false))
437 .with_column("o_totalprice", decimal.clone())
438 .with_column("o_orderdate", ScalarType::Date.nullable(false))
439 .with_column("o_orderpriority", ScalarType::String.nullable(false))
440 .with_column("o_clerk", ScalarType::String.nullable(false))
441 .with_column("o_shippriority", ScalarType::Int32.nullable(false))
442 .with_column("o_comment", ScalarType::String.nullable(false))
443 .with_key(vec![0])
444 .finish(),
445 LoadGeneratorOutput::Tpch(TpchView::Orders),
446 ),
447 (
448 "lineitem",
449 RelationDesc::builder()
450 .with_column("l_orderkey", identifier.clone())
451 .with_column("l_partkey", identifier.clone())
452 .with_column("l_suppkey", identifier.clone())
453 .with_column("l_linenumber", ScalarType::Int32.nullable(false))
454 .with_column("l_quantity", decimal.clone())
455 .with_column("l_extendedprice", decimal.clone())
456 .with_column("l_discount", decimal.clone())
457 .with_column("l_tax", decimal)
458 .with_column("l_returnflag", ScalarType::String.nullable(false))
459 .with_column("l_linestatus", ScalarType::String.nullable(false))
460 .with_column("l_shipdate", ScalarType::Date.nullable(false))
461 .with_column("l_commitdate", ScalarType::Date.nullable(false))
462 .with_column("l_receiptdate", ScalarType::Date.nullable(false))
463 .with_column("l_shipinstruct", ScalarType::String.nullable(false))
464 .with_column("l_shipmode", ScalarType::String.nullable(false))
465 .with_column("l_comment", ScalarType::String.nullable(false))
466 .with_key(vec![0, 3])
467 .finish(),
468 LoadGeneratorOutput::Tpch(TpchView::Lineitem),
469 ),
470 (
471 "nation",
472 RelationDesc::builder()
473 .with_column("n_nationkey", identifier.clone())
474 .with_column("n_name", ScalarType::String.nullable(false))
475 .with_column("n_regionkey", identifier.clone())
476 .with_column("n_comment", ScalarType::String.nullable(false))
477 .with_key(vec![0])
478 .finish(),
479 LoadGeneratorOutput::Tpch(TpchView::Nation),
480 ),
481 (
482 "region",
483 RelationDesc::builder()
484 .with_column("r_regionkey", identifier)
485 .with_column("r_name", ScalarType::String.nullable(false))
486 .with_column("r_comment", ScalarType::String.nullable(false))
487 .with_key(vec![0])
488 .finish(),
489 LoadGeneratorOutput::Tpch(TpchView::Region),
490 ),
491 ]
492 }
493 LoadGenerator::KeyValue(_) => vec![],
494 }
495 }
496
497 pub fn is_monotonic(&self) -> bool {
498 match self {
499 LoadGenerator::Auction => true,
500 LoadGenerator::Clock => false,
501 LoadGenerator::Counter {
502 max_cardinality: None,
503 } => true,
504 LoadGenerator::Counter { .. } => false,
505 LoadGenerator::Marketing => false,
506 LoadGenerator::Datums => true,
507 LoadGenerator::Tpch { .. } => false,
508 LoadGenerator::KeyValue(_) => true,
509 }
510 }
511}
512
513#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary, PartialOrd, Ord)]
517pub enum LoadGeneratorOutput {
518 Default,
520 Auction(AuctionView),
521 Marketing(MarketingView),
522 Tpch(TpchView),
523}
524
525#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary, PartialOrd, Ord)]
526pub enum AuctionView {
527 Organizations,
528 Users,
529 Accounts,
530 Auctions,
531 Bids,
532}
533
534#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary, PartialOrd, Ord)]
535pub enum MarketingView {
536 Customers,
537 Impressions,
538 Clicks,
539 Leads,
540 Coupons,
541 ConversionPredictions,
542}
543
544#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary, PartialOrd, Ord)]
545pub enum TpchView {
546 Supplier,
547 Part,
548 Partsupp,
549 Customer,
550 Orders,
551 Lineitem,
552 Nation,
553 Region,
554}
555
556impl From<&str> for AuctionView {
560 fn from(s: &str) -> Self {
561 match s {
562 "organizations" => AuctionView::Organizations,
563 "users" => AuctionView::Users,
564 "accounts" => AuctionView::Accounts,
565 "auctions" => AuctionView::Auctions,
566 "bids" => AuctionView::Bids,
567 _ => panic!("unexpected load generator output name: {}", s),
568 }
569 }
570}
571
572impl From<&str> for MarketingView {
573 fn from(s: &str) -> Self {
574 match s {
575 "customers" => MarketingView::Customers,
576 "impressions" => MarketingView::Impressions,
577 "clicks" => MarketingView::Clicks,
578 "leads" => MarketingView::Leads,
579 "coupons" => MarketingView::Coupons,
580 "conversion_predictions" => MarketingView::ConversionPredictions,
581 _ => panic!("unexpected load generator output name: {}", s),
582 }
583 }
584}
585
586impl From<&str> for TpchView {
587 fn from(s: &str) -> Self {
588 match s {
589 "supplier" => TpchView::Supplier,
590 "part" => TpchView::Part,
591 "partsupp" => TpchView::Partsupp,
592 "customer" => TpchView::Customer,
593 "orders" => TpchView::Orders,
594 "lineitem" => TpchView::Lineitem,
595 "nation" => TpchView::Nation,
596 "region" => TpchView::Region,
597 _ => panic!("unexpected load generator output name: {}", s),
598 }
599 }
600}
601
602impl RustType<ProtoLoadGeneratorAuctionOutput> for AuctionView {
603 fn into_proto(&self) -> ProtoLoadGeneratorAuctionOutput {
604 match self {
605 AuctionView::Organizations => ProtoLoadGeneratorAuctionOutput::Organizations,
606 AuctionView::Users => ProtoLoadGeneratorAuctionOutput::Users,
607 AuctionView::Accounts => ProtoLoadGeneratorAuctionOutput::Accounts,
608 AuctionView::Auctions => ProtoLoadGeneratorAuctionOutput::Auctions,
609 AuctionView::Bids => ProtoLoadGeneratorAuctionOutput::Bids,
610 }
611 }
612
613 fn from_proto(proto: ProtoLoadGeneratorAuctionOutput) -> Result<Self, TryFromProtoError> {
614 Ok(match proto {
615 ProtoLoadGeneratorAuctionOutput::Organizations => AuctionView::Organizations,
616 ProtoLoadGeneratorAuctionOutput::Users => AuctionView::Users,
617 ProtoLoadGeneratorAuctionOutput::Accounts => AuctionView::Accounts,
618 ProtoLoadGeneratorAuctionOutput::Auctions => AuctionView::Auctions,
619 ProtoLoadGeneratorAuctionOutput::Bids => AuctionView::Bids,
620 })
621 }
622}
623
624impl RustType<ProtoLoadGeneratorMarketingOutput> for MarketingView {
625 fn into_proto(&self) -> ProtoLoadGeneratorMarketingOutput {
626 match self {
627 MarketingView::Customers => ProtoLoadGeneratorMarketingOutput::Customers,
628 MarketingView::Impressions => ProtoLoadGeneratorMarketingOutput::Impressions,
629 MarketingView::Clicks => ProtoLoadGeneratorMarketingOutput::Clicks,
630 MarketingView::Leads => ProtoLoadGeneratorMarketingOutput::Leads,
631 MarketingView::Coupons => ProtoLoadGeneratorMarketingOutput::Coupons,
632 MarketingView::ConversionPredictions => {
633 ProtoLoadGeneratorMarketingOutput::ConversionPredictions
634 }
635 }
636 }
637
638 fn from_proto(proto: ProtoLoadGeneratorMarketingOutput) -> Result<Self, TryFromProtoError> {
639 Ok(match proto {
640 ProtoLoadGeneratorMarketingOutput::Customers => MarketingView::Customers,
641 ProtoLoadGeneratorMarketingOutput::Impressions => MarketingView::Impressions,
642 ProtoLoadGeneratorMarketingOutput::Clicks => MarketingView::Clicks,
643 ProtoLoadGeneratorMarketingOutput::Leads => MarketingView::Leads,
644 ProtoLoadGeneratorMarketingOutput::Coupons => MarketingView::Coupons,
645 ProtoLoadGeneratorMarketingOutput::ConversionPredictions => {
646 MarketingView::ConversionPredictions
647 }
648 })
649 }
650}
651
652impl RustType<ProtoLoadGeneratorTpchOutput> for TpchView {
653 fn into_proto(&self) -> ProtoLoadGeneratorTpchOutput {
654 match self {
655 TpchView::Supplier => ProtoLoadGeneratorTpchOutput::Supplier,
656 TpchView::Part => ProtoLoadGeneratorTpchOutput::Part,
657 TpchView::Partsupp => ProtoLoadGeneratorTpchOutput::Partsupp,
658 TpchView::Customer => ProtoLoadGeneratorTpchOutput::Customer,
659 TpchView::Orders => ProtoLoadGeneratorTpchOutput::Orders,
660 TpchView::Lineitem => ProtoLoadGeneratorTpchOutput::Lineitem,
661 TpchView::Nation => ProtoLoadGeneratorTpchOutput::Nation,
662 TpchView::Region => ProtoLoadGeneratorTpchOutput::Region,
663 }
664 }
665
666 fn from_proto(proto: ProtoLoadGeneratorTpchOutput) -> Result<Self, TryFromProtoError> {
667 Ok(match proto {
668 ProtoLoadGeneratorTpchOutput::Supplier => TpchView::Supplier,
669 ProtoLoadGeneratorTpchOutput::Part => TpchView::Part,
670 ProtoLoadGeneratorTpchOutput::Partsupp => TpchView::Partsupp,
671 ProtoLoadGeneratorTpchOutput::Customer => TpchView::Customer,
672 ProtoLoadGeneratorTpchOutput::Orders => TpchView::Orders,
673 ProtoLoadGeneratorTpchOutput::Lineitem => TpchView::Lineitem,
674 ProtoLoadGeneratorTpchOutput::Nation => TpchView::Nation,
675 ProtoLoadGeneratorTpchOutput::Region => TpchView::Region,
676 })
677 }
678}
679
680impl RustType<ProtoLoadGeneratorOutput> for LoadGeneratorOutput {
681 fn into_proto(&self) -> ProtoLoadGeneratorOutput {
682 use proto_load_generator_output::Kind;
683 let kind = match self {
684 LoadGeneratorOutput::Default => Kind::Default(()),
685 LoadGeneratorOutput::Auction(view) => Kind::Auction(view.into_proto().into()),
686 LoadGeneratorOutput::Marketing(view) => Kind::Marketing(view.into_proto().into()),
687 LoadGeneratorOutput::Tpch(view) => Kind::Tpch(view.into_proto().into()),
688 };
689 ProtoLoadGeneratorOutput { kind: Some(kind) }
690 }
691
692 fn from_proto(proto: ProtoLoadGeneratorOutput) -> Result<Self, TryFromProtoError> {
693 use proto_load_generator_output::Kind;
694 Ok(match proto.kind {
695 Some(Kind::Default(())) => LoadGeneratorOutput::Default,
696 Some(Kind::Auction(view)) => LoadGeneratorOutput::Auction(
697 ProtoLoadGeneratorAuctionOutput::try_from(view)
698 .map_err(|_| {
699 TryFromProtoError::unknown_enum_variant("ProtoLoadGeneratorAuctionOutput")
700 })?
701 .into_rust()?,
702 ),
703 Some(Kind::Marketing(view)) => LoadGeneratorOutput::Marketing(
704 ProtoLoadGeneratorMarketingOutput::try_from(view)
705 .map_err(|_| {
706 TryFromProtoError::unknown_enum_variant("ProtoLoadGeneratorMarketingOutput")
707 })?
708 .into_rust()?,
709 ),
710 Some(Kind::Tpch(view)) => LoadGeneratorOutput::Tpch(
711 ProtoLoadGeneratorTpchOutput::try_from(view)
712 .map_err(|_| {
713 TryFromProtoError::unknown_enum_variant("ProtoLoadGeneratorTpchOutput")
714 })?
715 .into_rust()?,
716 ),
717 None => {
718 return Err(TryFromProtoError::missing_field(
719 "ProtoLoadGeneratorOutput::kind",
720 ));
721 }
722 })
723 }
724}
725
726#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
727pub struct LoadGeneratorSourceExportDetails {
728 pub output: LoadGeneratorOutput,
729}
730
731impl RustType<ProtoLoadGeneratorSourceExportDetails> for LoadGeneratorSourceExportDetails {
732 fn into_proto(&self) -> ProtoLoadGeneratorSourceExportDetails {
733 ProtoLoadGeneratorSourceExportDetails {
734 output: self.output.into_proto().into(),
735 }
736 }
737
738 fn from_proto(proto: ProtoLoadGeneratorSourceExportDetails) -> Result<Self, TryFromProtoError> {
739 Ok(LoadGeneratorSourceExportDetails {
740 output: proto
741 .output
742 .into_rust_if_some("ProtoLoadGeneratorSourceExportDetails::output")?,
743 })
744 }
745}
746
747impl AlterCompatible for LoadGeneratorSourceExportDetails {
748 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
749 let Self { output } = self;
750 if output != &other.output {
751 tracing::warn!(
752 "LoadGeneratorSourceExportDetails incompatible at output:\nself:\n{:#?}\n\nother\n{:#?}",
753 self,
754 other
755 );
756 return Err(AlterError { id });
757 }
758 Ok(())
759 }
760}
761
762pub trait Generator {
763 fn by_seed(
765 &self,
766 now: NowFn,
767 seed: Option<u64>,
768 resume_offset: MzOffset,
769 ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>>;
770}
771
772impl RustType<ProtoLoadGeneratorSourceConnection> for LoadGeneratorSourceConnection {
773 fn into_proto(&self) -> ProtoLoadGeneratorSourceConnection {
774 use proto_load_generator_source_connection::Kind;
775 ProtoLoadGeneratorSourceConnection {
776 kind: Some(match &self.load_generator {
777 LoadGenerator::Auction => Kind::Auction(()),
778 LoadGenerator::Clock => Kind::Clock(()),
779 LoadGenerator::Counter { max_cardinality } => {
780 Kind::Counter(ProtoCounterLoadGenerator {
781 max_cardinality: *max_cardinality,
782 })
783 }
784 LoadGenerator::Marketing => Kind::Marketing(()),
785 LoadGenerator::Tpch {
786 count_supplier,
787 count_part,
788 count_customer,
789 count_orders,
790 count_clerk,
791 } => Kind::Tpch(ProtoTpchLoadGenerator {
792 count_supplier: *count_supplier,
793 count_part: *count_part,
794 count_customer: *count_customer,
795 count_orders: *count_orders,
796 count_clerk: *count_clerk,
797 }),
798 LoadGenerator::Datums => Kind::Datums(()),
799 LoadGenerator::KeyValue(kv) => Kind::KeyValue(kv.into_proto()),
800 }),
801 tick_micros: self.tick_micros,
802 as_of: self.as_of,
803 up_to: self.up_to,
804 }
805 }
806
807 fn from_proto(proto: ProtoLoadGeneratorSourceConnection) -> Result<Self, TryFromProtoError> {
808 use proto_load_generator_source_connection::Kind;
809 let kind = proto.kind.ok_or_else(|| {
810 TryFromProtoError::missing_field("ProtoLoadGeneratorSourceConnection::kind")
811 })?;
812 Ok(LoadGeneratorSourceConnection {
813 load_generator: match kind {
814 Kind::Auction(()) => LoadGenerator::Auction,
815 Kind::Clock(()) => LoadGenerator::Clock,
816 Kind::Counter(ProtoCounterLoadGenerator { max_cardinality }) => {
817 LoadGenerator::Counter { max_cardinality }
818 }
819 Kind::Marketing(()) => LoadGenerator::Marketing,
820 Kind::Tpch(ProtoTpchLoadGenerator {
821 count_supplier,
822 count_part,
823 count_customer,
824 count_orders,
825 count_clerk,
826 }) => LoadGenerator::Tpch {
827 count_supplier,
828 count_part,
829 count_customer,
830 count_orders,
831 count_clerk,
832 },
833 Kind::Datums(()) => LoadGenerator::Datums,
834 Kind::KeyValue(kv) => LoadGenerator::KeyValue(kv.into_rust()?),
835 },
836 tick_micros: proto.tick_micros,
837 as_of: proto.as_of,
838 up_to: proto.up_to,
839 })
840 }
841}
842
843#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Default)]
844pub struct KeyValueLoadGenerator {
845 pub keys: u64,
847 pub snapshot_rounds: u64,
852 pub transactional_snapshot: bool,
855 pub value_size: u64,
857 pub partitions: u64,
860 pub tick_interval: Option<Duration>,
863 pub batch_size: u64,
865 pub seed: u64,
867 pub include_offset: Option<String>,
869}
870
871impl KeyValueLoadGenerator {
872 pub fn transactional_snapshot_rounds(&self) -> u64 {
874 if self.transactional_snapshot {
875 self.snapshot_rounds
876 } else {
877 0
878 }
879 }
880
881 pub fn non_transactional_snapshot_rounds(&self) -> u64 {
883 if self.transactional_snapshot {
884 0
885 } else {
886 self.snapshot_rounds
887 }
888 }
889}
890
891impl RustType<ProtoKeyValueLoadGenerator> for KeyValueLoadGenerator {
892 fn into_proto(&self) -> ProtoKeyValueLoadGenerator {
893 ProtoKeyValueLoadGenerator {
894 keys: self.keys,
895 snapshot_rounds: self.snapshot_rounds,
896 transactional_snapshot: self.transactional_snapshot,
897 value_size: self.value_size,
898 partitions: self.partitions,
899 tick_interval: self.tick_interval.into_proto(),
900 batch_size: self.batch_size,
901 seed: self.seed,
902 include_offset: self.include_offset.clone(),
903 }
904 }
905
906 fn from_proto(proto: ProtoKeyValueLoadGenerator) -> Result<Self, TryFromProtoError> {
907 Ok(Self {
908 keys: proto.keys,
909 snapshot_rounds: proto.snapshot_rounds,
910 transactional_snapshot: proto.transactional_snapshot,
911 value_size: proto.value_size,
912 partitions: proto.partitions,
913 tick_interval: proto.tick_interval.into_rust()?,
914 batch_size: proto.batch_size,
915 seed: proto.seed,
916 include_offset: proto.include_offset,
917 })
918 }
919}