mz_storage_types/sources/
load_generator.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to load generator sources
11
12use std::sync::LazyLock;
13use std::time::Duration;
14
15use mz_ore::now::NowFn;
16use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
17use mz_repr::adt::numeric::NumericMaxScale;
18use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Row, ScalarType};
19use proptest_derive::Arbitrary;
20use serde::{Deserialize, Serialize};
21use std::collections::BTreeSet;
22
23use crate::AlterCompatible;
24use crate::sources::AlterError;
25use crate::sources::{MzOffset, SourceConnection};
26
27use super::SourceExportDetails;
28
29include!(concat!(
30    env!("OUT_DIR"),
31    "/mz_storage_types.sources.load_generator.rs"
32));
33
34pub const LOAD_GENERATOR_KEY_VALUE_OFFSET_DEFAULT: &str = "offset";
35
36/// Data and progress events of the native stream.
37#[derive(Debug)]
38pub enum Event<F: IntoIterator, D> {
39    /// Indicates that timestamps have advanced to frontier F
40    Progress(F),
41    /// Indicates that event D happened at time T
42    Message(F::Item, D),
43}
44
45#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
46pub struct LoadGeneratorSourceConnection {
47    pub load_generator: LoadGenerator,
48    pub tick_micros: Option<u64>,
49    pub as_of: u64,
50    pub up_to: u64,
51}
52
53pub static LOAD_GEN_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
54    RelationDesc::builder()
55        .with_column("offset", ScalarType::UInt64.nullable(true))
56        .finish()
57});
58
59impl SourceConnection for LoadGeneratorSourceConnection {
60    fn name(&self) -> &'static str {
61        "load-generator"
62    }
63
64    fn external_reference(&self) -> Option<&str> {
65        None
66    }
67
68    fn default_key_desc(&self) -> RelationDesc {
69        match &self.load_generator {
70            LoadGenerator::KeyValue(_) => {
71                // `"key"` is overridden by the key_envelope in planning.
72                RelationDesc::builder()
73                    .with_column("key", ScalarType::UInt64.nullable(false))
74                    .finish()
75            }
76            _ => RelationDesc::empty(),
77        }
78    }
79
80    fn default_value_desc(&self) -> RelationDesc {
81        match &self.load_generator {
82            LoadGenerator::Auction => RelationDesc::empty(),
83            LoadGenerator::Clock => RelationDesc::builder()
84                .with_column(
85                    "time",
86                    ScalarType::TimestampTz { precision: None }.nullable(false),
87                )
88                .finish(),
89            LoadGenerator::Datums => {
90                let mut desc =
91                    RelationDesc::builder().with_column("rowid", ScalarType::Int64.nullable(false));
92                let typs = ScalarType::enumerate();
93                let mut names = BTreeSet::new();
94                for typ in typs {
95                    // Cut out variant information from the debug print.
96                    let mut name = format!("_{:?}", typ)
97                        .split(' ')
98                        .next()
99                        .unwrap()
100                        .to_lowercase();
101                    // Incase we ever have multiple variants of the same type, create
102                    // unique names for them.
103                    while names.contains(&name) {
104                        name.push('_');
105                    }
106                    names.insert(name.clone());
107                    desc = desc.with_column(name, typ.clone().nullable(true));
108                }
109                desc.finish()
110            }
111            LoadGenerator::Counter { .. } => RelationDesc::builder()
112                .with_column("counter", ScalarType::Int64.nullable(false))
113                .finish(),
114            LoadGenerator::Marketing => RelationDesc::empty(),
115            LoadGenerator::Tpch { .. } => RelationDesc::empty(),
116            LoadGenerator::KeyValue(KeyValueLoadGenerator { include_offset, .. }) => {
117                let mut desc = RelationDesc::builder()
118                    .with_column("partition", ScalarType::UInt64.nullable(false))
119                    .with_column("value", ScalarType::Bytes.nullable(false));
120
121                if let Some(offset_name) = include_offset.as_deref() {
122                    desc = desc.with_column(offset_name, ScalarType::UInt64.nullable(false));
123                }
124                desc.finish()
125            }
126        }
127    }
128
129    fn timestamp_desc(&self) -> RelationDesc {
130        LOAD_GEN_PROGRESS_DESC.clone()
131    }
132
133    fn connection_id(&self) -> Option<CatalogItemId> {
134        None
135    }
136
137    // Some load-gen types output to their primary collection while
138    // others do not.
139    fn primary_export_details(&self) -> SourceExportDetails {
140        match &self.load_generator {
141            LoadGenerator::Auction => SourceExportDetails::None,
142            LoadGenerator::Clock => {
143                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
144                    output: LoadGeneratorOutput::Default,
145                })
146            }
147            LoadGenerator::Datums => {
148                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
149                    output: LoadGeneratorOutput::Default,
150                })
151            }
152            LoadGenerator::Counter { .. } => {
153                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
154                    output: LoadGeneratorOutput::Default,
155                })
156            }
157            LoadGenerator::Marketing => SourceExportDetails::None,
158            LoadGenerator::Tpch { .. } => SourceExportDetails::None,
159            LoadGenerator::KeyValue(_) => {
160                SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
161                    output: LoadGeneratorOutput::Default,
162                })
163            }
164        }
165    }
166
167    fn supports_read_only(&self) -> bool {
168        true
169    }
170
171    fn prefers_single_replica(&self) -> bool {
172        false
173    }
174}
175
176impl crate::AlterCompatible for LoadGeneratorSourceConnection {}
177
178#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
179pub enum LoadGenerator {
180    Auction,
181    Clock,
182    Counter {
183        /// How many values will be emitted
184        /// before old ones are retracted, or `None` for
185        /// an append-only collection.
186        max_cardinality: Option<u64>,
187    },
188    Datums,
189    Marketing,
190    Tpch {
191        count_supplier: i64,
192        count_part: i64,
193        count_customer: i64,
194        count_orders: i64,
195        count_clerk: i64,
196    },
197    KeyValue(KeyValueLoadGenerator),
198}
199
200pub const LOAD_GENERATOR_DATABASE_NAME: &str = "mz_load_generators";
201
202impl LoadGenerator {
203    /// Must be kept in-sync with the same mapping on the `LoadGenerator` enum defined in
204    /// src/sql-parser/src/ast/defs/ddl.rs.
205    pub fn schema_name(&self) -> &'static str {
206        match self {
207            LoadGenerator::Counter { .. } => "counter",
208            LoadGenerator::Clock => "clock",
209            LoadGenerator::Marketing => "marketing",
210            LoadGenerator::Auction => "auction",
211            LoadGenerator::Datums => "datums",
212            LoadGenerator::Tpch { .. } => "tpch",
213            LoadGenerator::KeyValue { .. } => "key_value",
214        }
215    }
216
217    /// Returns the list of table names and their column types that this generator generates
218    pub fn views(&self) -> Vec<(&str, RelationDesc, LoadGeneratorOutput)> {
219        match self {
220            LoadGenerator::Auction => vec![
221                (
222                    "organizations",
223                    RelationDesc::builder()
224                        .with_column("id", ScalarType::Int64.nullable(false))
225                        .with_column("name", ScalarType::String.nullable(false))
226                        .with_key(vec![0])
227                        .finish(),
228                    LoadGeneratorOutput::Auction(AuctionView::Organizations),
229                ),
230                (
231                    "users",
232                    RelationDesc::builder()
233                        .with_column("id", ScalarType::Int64.nullable(false))
234                        .with_column("org_id", ScalarType::Int64.nullable(false))
235                        .with_column("name", ScalarType::String.nullable(false))
236                        .with_key(vec![0])
237                        .finish(),
238                    LoadGeneratorOutput::Auction(AuctionView::Users),
239                ),
240                (
241                    "accounts",
242                    RelationDesc::builder()
243                        .with_column("id", ScalarType::Int64.nullable(false))
244                        .with_column("org_id", ScalarType::Int64.nullable(false))
245                        .with_column("balance", ScalarType::Int64.nullable(false))
246                        .with_key(vec![0])
247                        .finish(),
248                    LoadGeneratorOutput::Auction(AuctionView::Accounts),
249                ),
250                (
251                    "auctions",
252                    RelationDesc::builder()
253                        .with_column("id", ScalarType::Int64.nullable(false))
254                        .with_column("seller", ScalarType::Int64.nullable(false))
255                        .with_column("item", ScalarType::String.nullable(false))
256                        .with_column(
257                            "end_time",
258                            ScalarType::TimestampTz { precision: None }.nullable(false),
259                        )
260                        .with_key(vec![0])
261                        .finish(),
262                    LoadGeneratorOutput::Auction(AuctionView::Auctions),
263                ),
264                (
265                    "bids",
266                    RelationDesc::builder()
267                        .with_column("id", ScalarType::Int64.nullable(false))
268                        .with_column("buyer", ScalarType::Int64.nullable(false))
269                        .with_column("auction_id", ScalarType::Int64.nullable(false))
270                        .with_column("amount", ScalarType::Int32.nullable(false))
271                        .with_column(
272                            "bid_time",
273                            ScalarType::TimestampTz { precision: None }.nullable(false),
274                        )
275                        .with_key(vec![0])
276                        .finish(),
277                    LoadGeneratorOutput::Auction(AuctionView::Bids),
278                ),
279            ],
280            LoadGenerator::Clock => vec![],
281            LoadGenerator::Counter { max_cardinality: _ } => vec![],
282            LoadGenerator::Marketing => {
283                vec![
284                    (
285                        "customers",
286                        RelationDesc::builder()
287                            .with_column("id", ScalarType::Int64.nullable(false))
288                            .with_column("email", ScalarType::String.nullable(false))
289                            .with_column("income", ScalarType::Int64.nullable(false))
290                            .with_key(vec![0])
291                            .finish(),
292                        LoadGeneratorOutput::Marketing(MarketingView::Customers),
293                    ),
294                    (
295                        "impressions",
296                        RelationDesc::builder()
297                            .with_column("id", ScalarType::Int64.nullable(false))
298                            .with_column("customer_id", ScalarType::Int64.nullable(false))
299                            .with_column("campaign_id", ScalarType::Int64.nullable(false))
300                            .with_column(
301                                "impression_time",
302                                ScalarType::TimestampTz { precision: None }.nullable(false),
303                            )
304                            .with_key(vec![0])
305                            .finish(),
306                        LoadGeneratorOutput::Marketing(MarketingView::Impressions),
307                    ),
308                    (
309                        "clicks",
310                        RelationDesc::builder()
311                            .with_column("impression_id", ScalarType::Int64.nullable(false))
312                            .with_column(
313                                "click_time",
314                                ScalarType::TimestampTz { precision: None }.nullable(false),
315                            )
316                            .finish(),
317                        LoadGeneratorOutput::Marketing(MarketingView::Clicks),
318                    ),
319                    (
320                        "leads",
321                        RelationDesc::builder()
322                            .with_column("id", ScalarType::Int64.nullable(false))
323                            .with_column("customer_id", ScalarType::Int64.nullable(false))
324                            .with_column(
325                                "created_at",
326                                ScalarType::TimestampTz { precision: None }.nullable(false),
327                            )
328                            .with_column(
329                                "converted_at",
330                                ScalarType::TimestampTz { precision: None }.nullable(true),
331                            )
332                            .with_column("conversion_amount", ScalarType::Int64.nullable(true))
333                            .with_key(vec![0])
334                            .finish(),
335                        LoadGeneratorOutput::Marketing(MarketingView::Leads),
336                    ),
337                    (
338                        "coupons",
339                        RelationDesc::builder()
340                            .with_column("id", ScalarType::Int64.nullable(false))
341                            .with_column("lead_id", ScalarType::Int64.nullable(false))
342                            .with_column(
343                                "created_at",
344                                ScalarType::TimestampTz { precision: None }.nullable(false),
345                            )
346                            .with_column("amount", ScalarType::Int64.nullable(false))
347                            .with_key(vec![0])
348                            .finish(),
349                        LoadGeneratorOutput::Marketing(MarketingView::Coupons),
350                    ),
351                    (
352                        "conversion_predictions",
353                        RelationDesc::builder()
354                            .with_column("lead_id", ScalarType::Int64.nullable(false))
355                            .with_column("experiment_bucket", ScalarType::String.nullable(false))
356                            .with_column(
357                                "predicted_at",
358                                ScalarType::TimestampTz { precision: None }.nullable(false),
359                            )
360                            .with_column("score", ScalarType::Float64.nullable(false))
361                            .finish(),
362                        LoadGeneratorOutput::Marketing(MarketingView::ConversionPredictions),
363                    ),
364                ]
365            }
366            LoadGenerator::Datums => vec![],
367            LoadGenerator::Tpch { .. } => {
368                let identifier = ScalarType::Int64.nullable(false);
369                let decimal = ScalarType::Numeric {
370                    max_scale: Some(NumericMaxScale::try_from(2i64).unwrap()),
371                }
372                .nullable(false);
373                vec![
374                    (
375                        "supplier",
376                        RelationDesc::builder()
377                            .with_column("s_suppkey", identifier.clone())
378                            .with_column("s_name", ScalarType::String.nullable(false))
379                            .with_column("s_address", ScalarType::String.nullable(false))
380                            .with_column("s_nationkey", identifier.clone())
381                            .with_column("s_phone", ScalarType::String.nullable(false))
382                            .with_column("s_acctbal", decimal.clone())
383                            .with_column("s_comment", ScalarType::String.nullable(false))
384                            .with_key(vec![0])
385                            .finish(),
386                        LoadGeneratorOutput::Tpch(TpchView::Supplier),
387                    ),
388                    (
389                        "part",
390                        RelationDesc::builder()
391                            .with_column("p_partkey", identifier.clone())
392                            .with_column("p_name", ScalarType::String.nullable(false))
393                            .with_column("p_mfgr", ScalarType::String.nullable(false))
394                            .with_column("p_brand", ScalarType::String.nullable(false))
395                            .with_column("p_type", ScalarType::String.nullable(false))
396                            .with_column("p_size", ScalarType::Int32.nullable(false))
397                            .with_column("p_container", ScalarType::String.nullable(false))
398                            .with_column("p_retailprice", decimal.clone())
399                            .with_column("p_comment", ScalarType::String.nullable(false))
400                            .with_key(vec![0])
401                            .finish(),
402                        LoadGeneratorOutput::Tpch(TpchView::Part),
403                    ),
404                    (
405                        "partsupp",
406                        RelationDesc::builder()
407                            .with_column("ps_partkey", identifier.clone())
408                            .with_column("ps_suppkey", identifier.clone())
409                            .with_column("ps_availqty", ScalarType::Int32.nullable(false))
410                            .with_column("ps_supplycost", decimal.clone())
411                            .with_column("ps_comment", ScalarType::String.nullable(false))
412                            .with_key(vec![0, 1])
413                            .finish(),
414                        LoadGeneratorOutput::Tpch(TpchView::Partsupp),
415                    ),
416                    (
417                        "customer",
418                        RelationDesc::builder()
419                            .with_column("c_custkey", identifier.clone())
420                            .with_column("c_name", ScalarType::String.nullable(false))
421                            .with_column("c_address", ScalarType::String.nullable(false))
422                            .with_column("c_nationkey", identifier.clone())
423                            .with_column("c_phone", ScalarType::String.nullable(false))
424                            .with_column("c_acctbal", decimal.clone())
425                            .with_column("c_mktsegment", ScalarType::String.nullable(false))
426                            .with_column("c_comment", ScalarType::String.nullable(false))
427                            .with_key(vec![0])
428                            .finish(),
429                        LoadGeneratorOutput::Tpch(TpchView::Customer),
430                    ),
431                    (
432                        "orders",
433                        RelationDesc::builder()
434                            .with_column("o_orderkey", identifier.clone())
435                            .with_column("o_custkey", identifier.clone())
436                            .with_column("o_orderstatus", ScalarType::String.nullable(false))
437                            .with_column("o_totalprice", decimal.clone())
438                            .with_column("o_orderdate", ScalarType::Date.nullable(false))
439                            .with_column("o_orderpriority", ScalarType::String.nullable(false))
440                            .with_column("o_clerk", ScalarType::String.nullable(false))
441                            .with_column("o_shippriority", ScalarType::Int32.nullable(false))
442                            .with_column("o_comment", ScalarType::String.nullable(false))
443                            .with_key(vec![0])
444                            .finish(),
445                        LoadGeneratorOutput::Tpch(TpchView::Orders),
446                    ),
447                    (
448                        "lineitem",
449                        RelationDesc::builder()
450                            .with_column("l_orderkey", identifier.clone())
451                            .with_column("l_partkey", identifier.clone())
452                            .with_column("l_suppkey", identifier.clone())
453                            .with_column("l_linenumber", ScalarType::Int32.nullable(false))
454                            .with_column("l_quantity", decimal.clone())
455                            .with_column("l_extendedprice", decimal.clone())
456                            .with_column("l_discount", decimal.clone())
457                            .with_column("l_tax", decimal)
458                            .with_column("l_returnflag", ScalarType::String.nullable(false))
459                            .with_column("l_linestatus", ScalarType::String.nullable(false))
460                            .with_column("l_shipdate", ScalarType::Date.nullable(false))
461                            .with_column("l_commitdate", ScalarType::Date.nullable(false))
462                            .with_column("l_receiptdate", ScalarType::Date.nullable(false))
463                            .with_column("l_shipinstruct", ScalarType::String.nullable(false))
464                            .with_column("l_shipmode", ScalarType::String.nullable(false))
465                            .with_column("l_comment", ScalarType::String.nullable(false))
466                            .with_key(vec![0, 3])
467                            .finish(),
468                        LoadGeneratorOutput::Tpch(TpchView::Lineitem),
469                    ),
470                    (
471                        "nation",
472                        RelationDesc::builder()
473                            .with_column("n_nationkey", identifier.clone())
474                            .with_column("n_name", ScalarType::String.nullable(false))
475                            .with_column("n_regionkey", identifier.clone())
476                            .with_column("n_comment", ScalarType::String.nullable(false))
477                            .with_key(vec![0])
478                            .finish(),
479                        LoadGeneratorOutput::Tpch(TpchView::Nation),
480                    ),
481                    (
482                        "region",
483                        RelationDesc::builder()
484                            .with_column("r_regionkey", identifier)
485                            .with_column("r_name", ScalarType::String.nullable(false))
486                            .with_column("r_comment", ScalarType::String.nullable(false))
487                            .with_key(vec![0])
488                            .finish(),
489                        LoadGeneratorOutput::Tpch(TpchView::Region),
490                    ),
491                ]
492            }
493            LoadGenerator::KeyValue(_) => vec![],
494        }
495    }
496
497    pub fn is_monotonic(&self) -> bool {
498        match self {
499            LoadGenerator::Auction => true,
500            LoadGenerator::Clock => false,
501            LoadGenerator::Counter {
502                max_cardinality: None,
503            } => true,
504            LoadGenerator::Counter { .. } => false,
505            LoadGenerator::Marketing => false,
506            LoadGenerator::Datums => true,
507            LoadGenerator::Tpch { .. } => false,
508            LoadGenerator::KeyValue(_) => true,
509        }
510    }
511}
512
513// Used to identify a view of a load-generator source
514// such that the source dataflow can output data to the correct
515// data output for a source-export using this view
516#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary, PartialOrd, Ord)]
517pub enum LoadGeneratorOutput {
518    // Used for outputting to the primary source output
519    Default,
520    Auction(AuctionView),
521    Marketing(MarketingView),
522    Tpch(TpchView),
523}
524
525#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary, PartialOrd, Ord)]
526pub enum AuctionView {
527    Organizations,
528    Users,
529    Accounts,
530    Auctions,
531    Bids,
532}
533
534#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary, PartialOrd, Ord)]
535pub enum MarketingView {
536    Customers,
537    Impressions,
538    Clicks,
539    Leads,
540    Coupons,
541    ConversionPredictions,
542}
543
544#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary, PartialOrd, Ord)]
545pub enum TpchView {
546    Supplier,
547    Part,
548    Partsupp,
549    Customer,
550    Orders,
551    Lineitem,
552    Nation,
553    Region,
554}
555
556// TODO(roshan): Remove these str mappings once the migration
557// `ast_rewrite_create_load_gen_subsource_details` is removed,
558// since these are only needed for the migration but should not otherwise be used.
559impl From<&str> for AuctionView {
560    fn from(s: &str) -> Self {
561        match s {
562            "organizations" => AuctionView::Organizations,
563            "users" => AuctionView::Users,
564            "accounts" => AuctionView::Accounts,
565            "auctions" => AuctionView::Auctions,
566            "bids" => AuctionView::Bids,
567            _ => panic!("unexpected load generator output name: {}", s),
568        }
569    }
570}
571
572impl From<&str> for MarketingView {
573    fn from(s: &str) -> Self {
574        match s {
575            "customers" => MarketingView::Customers,
576            "impressions" => MarketingView::Impressions,
577            "clicks" => MarketingView::Clicks,
578            "leads" => MarketingView::Leads,
579            "coupons" => MarketingView::Coupons,
580            "conversion_predictions" => MarketingView::ConversionPredictions,
581            _ => panic!("unexpected load generator output name: {}", s),
582        }
583    }
584}
585
586impl From<&str> for TpchView {
587    fn from(s: &str) -> Self {
588        match s {
589            "supplier" => TpchView::Supplier,
590            "part" => TpchView::Part,
591            "partsupp" => TpchView::Partsupp,
592            "customer" => TpchView::Customer,
593            "orders" => TpchView::Orders,
594            "lineitem" => TpchView::Lineitem,
595            "nation" => TpchView::Nation,
596            "region" => TpchView::Region,
597            _ => panic!("unexpected load generator output name: {}", s),
598        }
599    }
600}
601
602impl RustType<ProtoLoadGeneratorAuctionOutput> for AuctionView {
603    fn into_proto(&self) -> ProtoLoadGeneratorAuctionOutput {
604        match self {
605            AuctionView::Organizations => ProtoLoadGeneratorAuctionOutput::Organizations,
606            AuctionView::Users => ProtoLoadGeneratorAuctionOutput::Users,
607            AuctionView::Accounts => ProtoLoadGeneratorAuctionOutput::Accounts,
608            AuctionView::Auctions => ProtoLoadGeneratorAuctionOutput::Auctions,
609            AuctionView::Bids => ProtoLoadGeneratorAuctionOutput::Bids,
610        }
611    }
612
613    fn from_proto(proto: ProtoLoadGeneratorAuctionOutput) -> Result<Self, TryFromProtoError> {
614        Ok(match proto {
615            ProtoLoadGeneratorAuctionOutput::Organizations => AuctionView::Organizations,
616            ProtoLoadGeneratorAuctionOutput::Users => AuctionView::Users,
617            ProtoLoadGeneratorAuctionOutput::Accounts => AuctionView::Accounts,
618            ProtoLoadGeneratorAuctionOutput::Auctions => AuctionView::Auctions,
619            ProtoLoadGeneratorAuctionOutput::Bids => AuctionView::Bids,
620        })
621    }
622}
623
624impl RustType<ProtoLoadGeneratorMarketingOutput> for MarketingView {
625    fn into_proto(&self) -> ProtoLoadGeneratorMarketingOutput {
626        match self {
627            MarketingView::Customers => ProtoLoadGeneratorMarketingOutput::Customers,
628            MarketingView::Impressions => ProtoLoadGeneratorMarketingOutput::Impressions,
629            MarketingView::Clicks => ProtoLoadGeneratorMarketingOutput::Clicks,
630            MarketingView::Leads => ProtoLoadGeneratorMarketingOutput::Leads,
631            MarketingView::Coupons => ProtoLoadGeneratorMarketingOutput::Coupons,
632            MarketingView::ConversionPredictions => {
633                ProtoLoadGeneratorMarketingOutput::ConversionPredictions
634            }
635        }
636    }
637
638    fn from_proto(proto: ProtoLoadGeneratorMarketingOutput) -> Result<Self, TryFromProtoError> {
639        Ok(match proto {
640            ProtoLoadGeneratorMarketingOutput::Customers => MarketingView::Customers,
641            ProtoLoadGeneratorMarketingOutput::Impressions => MarketingView::Impressions,
642            ProtoLoadGeneratorMarketingOutput::Clicks => MarketingView::Clicks,
643            ProtoLoadGeneratorMarketingOutput::Leads => MarketingView::Leads,
644            ProtoLoadGeneratorMarketingOutput::Coupons => MarketingView::Coupons,
645            ProtoLoadGeneratorMarketingOutput::ConversionPredictions => {
646                MarketingView::ConversionPredictions
647            }
648        })
649    }
650}
651
652impl RustType<ProtoLoadGeneratorTpchOutput> for TpchView {
653    fn into_proto(&self) -> ProtoLoadGeneratorTpchOutput {
654        match self {
655            TpchView::Supplier => ProtoLoadGeneratorTpchOutput::Supplier,
656            TpchView::Part => ProtoLoadGeneratorTpchOutput::Part,
657            TpchView::Partsupp => ProtoLoadGeneratorTpchOutput::Partsupp,
658            TpchView::Customer => ProtoLoadGeneratorTpchOutput::Customer,
659            TpchView::Orders => ProtoLoadGeneratorTpchOutput::Orders,
660            TpchView::Lineitem => ProtoLoadGeneratorTpchOutput::Lineitem,
661            TpchView::Nation => ProtoLoadGeneratorTpchOutput::Nation,
662            TpchView::Region => ProtoLoadGeneratorTpchOutput::Region,
663        }
664    }
665
666    fn from_proto(proto: ProtoLoadGeneratorTpchOutput) -> Result<Self, TryFromProtoError> {
667        Ok(match proto {
668            ProtoLoadGeneratorTpchOutput::Supplier => TpchView::Supplier,
669            ProtoLoadGeneratorTpchOutput::Part => TpchView::Part,
670            ProtoLoadGeneratorTpchOutput::Partsupp => TpchView::Partsupp,
671            ProtoLoadGeneratorTpchOutput::Customer => TpchView::Customer,
672            ProtoLoadGeneratorTpchOutput::Orders => TpchView::Orders,
673            ProtoLoadGeneratorTpchOutput::Lineitem => TpchView::Lineitem,
674            ProtoLoadGeneratorTpchOutput::Nation => TpchView::Nation,
675            ProtoLoadGeneratorTpchOutput::Region => TpchView::Region,
676        })
677    }
678}
679
680impl RustType<ProtoLoadGeneratorOutput> for LoadGeneratorOutput {
681    fn into_proto(&self) -> ProtoLoadGeneratorOutput {
682        use proto_load_generator_output::Kind;
683        let kind = match self {
684            LoadGeneratorOutput::Default => Kind::Default(()),
685            LoadGeneratorOutput::Auction(view) => Kind::Auction(view.into_proto().into()),
686            LoadGeneratorOutput::Marketing(view) => Kind::Marketing(view.into_proto().into()),
687            LoadGeneratorOutput::Tpch(view) => Kind::Tpch(view.into_proto().into()),
688        };
689        ProtoLoadGeneratorOutput { kind: Some(kind) }
690    }
691
692    fn from_proto(proto: ProtoLoadGeneratorOutput) -> Result<Self, TryFromProtoError> {
693        use proto_load_generator_output::Kind;
694        Ok(match proto.kind {
695            Some(Kind::Default(())) => LoadGeneratorOutput::Default,
696            Some(Kind::Auction(view)) => LoadGeneratorOutput::Auction(
697                ProtoLoadGeneratorAuctionOutput::try_from(view)
698                    .map_err(|_| {
699                        TryFromProtoError::unknown_enum_variant("ProtoLoadGeneratorAuctionOutput")
700                    })?
701                    .into_rust()?,
702            ),
703            Some(Kind::Marketing(view)) => LoadGeneratorOutput::Marketing(
704                ProtoLoadGeneratorMarketingOutput::try_from(view)
705                    .map_err(|_| {
706                        TryFromProtoError::unknown_enum_variant("ProtoLoadGeneratorMarketingOutput")
707                    })?
708                    .into_rust()?,
709            ),
710            Some(Kind::Tpch(view)) => LoadGeneratorOutput::Tpch(
711                ProtoLoadGeneratorTpchOutput::try_from(view)
712                    .map_err(|_| {
713                        TryFromProtoError::unknown_enum_variant("ProtoLoadGeneratorTpchOutput")
714                    })?
715                    .into_rust()?,
716            ),
717            None => {
718                return Err(TryFromProtoError::missing_field(
719                    "ProtoLoadGeneratorOutput::kind",
720                ));
721            }
722        })
723    }
724}
725
726#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
727pub struct LoadGeneratorSourceExportDetails {
728    pub output: LoadGeneratorOutput,
729}
730
731impl RustType<ProtoLoadGeneratorSourceExportDetails> for LoadGeneratorSourceExportDetails {
732    fn into_proto(&self) -> ProtoLoadGeneratorSourceExportDetails {
733        ProtoLoadGeneratorSourceExportDetails {
734            output: self.output.into_proto().into(),
735        }
736    }
737
738    fn from_proto(proto: ProtoLoadGeneratorSourceExportDetails) -> Result<Self, TryFromProtoError> {
739        Ok(LoadGeneratorSourceExportDetails {
740            output: proto
741                .output
742                .into_rust_if_some("ProtoLoadGeneratorSourceExportDetails::output")?,
743        })
744    }
745}
746
747impl AlterCompatible for LoadGeneratorSourceExportDetails {
748    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
749        let Self { output } = self;
750        if output != &other.output {
751            tracing::warn!(
752                "LoadGeneratorSourceExportDetails incompatible at output:\nself:\n{:#?}\n\nother\n{:#?}",
753                self,
754                other
755            );
756            return Err(AlterError { id });
757        }
758        Ok(())
759    }
760}
761
762pub trait Generator {
763    /// Returns a function that produces rows and batch information.
764    fn by_seed(
765        &self,
766        now: NowFn,
767        seed: Option<u64>,
768        resume_offset: MzOffset,
769    ) -> Box<dyn Iterator<Item = (LoadGeneratorOutput, Event<Option<MzOffset>, (Row, Diff)>)>>;
770}
771
772impl RustType<ProtoLoadGeneratorSourceConnection> for LoadGeneratorSourceConnection {
773    fn into_proto(&self) -> ProtoLoadGeneratorSourceConnection {
774        use proto_load_generator_source_connection::Kind;
775        ProtoLoadGeneratorSourceConnection {
776            kind: Some(match &self.load_generator {
777                LoadGenerator::Auction => Kind::Auction(()),
778                LoadGenerator::Clock => Kind::Clock(()),
779                LoadGenerator::Counter { max_cardinality } => {
780                    Kind::Counter(ProtoCounterLoadGenerator {
781                        max_cardinality: *max_cardinality,
782                    })
783                }
784                LoadGenerator::Marketing => Kind::Marketing(()),
785                LoadGenerator::Tpch {
786                    count_supplier,
787                    count_part,
788                    count_customer,
789                    count_orders,
790                    count_clerk,
791                } => Kind::Tpch(ProtoTpchLoadGenerator {
792                    count_supplier: *count_supplier,
793                    count_part: *count_part,
794                    count_customer: *count_customer,
795                    count_orders: *count_orders,
796                    count_clerk: *count_clerk,
797                }),
798                LoadGenerator::Datums => Kind::Datums(()),
799                LoadGenerator::KeyValue(kv) => Kind::KeyValue(kv.into_proto()),
800            }),
801            tick_micros: self.tick_micros,
802            as_of: self.as_of,
803            up_to: self.up_to,
804        }
805    }
806
807    fn from_proto(proto: ProtoLoadGeneratorSourceConnection) -> Result<Self, TryFromProtoError> {
808        use proto_load_generator_source_connection::Kind;
809        let kind = proto.kind.ok_or_else(|| {
810            TryFromProtoError::missing_field("ProtoLoadGeneratorSourceConnection::kind")
811        })?;
812        Ok(LoadGeneratorSourceConnection {
813            load_generator: match kind {
814                Kind::Auction(()) => LoadGenerator::Auction,
815                Kind::Clock(()) => LoadGenerator::Clock,
816                Kind::Counter(ProtoCounterLoadGenerator { max_cardinality }) => {
817                    LoadGenerator::Counter { max_cardinality }
818                }
819                Kind::Marketing(()) => LoadGenerator::Marketing,
820                Kind::Tpch(ProtoTpchLoadGenerator {
821                    count_supplier,
822                    count_part,
823                    count_customer,
824                    count_orders,
825                    count_clerk,
826                }) => LoadGenerator::Tpch {
827                    count_supplier,
828                    count_part,
829                    count_customer,
830                    count_orders,
831                    count_clerk,
832                },
833                Kind::Datums(()) => LoadGenerator::Datums,
834                Kind::KeyValue(kv) => LoadGenerator::KeyValue(kv.into_rust()?),
835            },
836            tick_micros: proto.tick_micros,
837            as_of: proto.as_of,
838            up_to: proto.up_to,
839        })
840    }
841}
842
843#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Default)]
844pub struct KeyValueLoadGenerator {
845    /// The keyspace of the source.
846    pub keys: u64,
847    /// The number of rounds to emit values for each key in the snapshot.
848    /// This lets users scale the snapshot size independent of the keyspace.
849    ///
850    /// Please use `transactional_snapshot` and `non_transactional_snapshot_rounds`.
851    pub snapshot_rounds: u64,
852    /// When false, this lets us quickly produce updates, as opposed to a single-value
853    /// per key during a transactional snapshot
854    pub transactional_snapshot: bool,
855    /// The number of random bytes for each value.
856    pub value_size: u64,
857    /// The number of partitions. The keyspace is spread evenly across the partitions.
858    /// This lets users scale the concurrency of the source independently of the replica size.
859    pub partitions: u64,
860    /// If provided, the maximum rate at which new batches of updates, per-partition will be
861    /// produced after the snapshot.
862    pub tick_interval: Option<Duration>,
863    /// The number of keys in each update batch.
864    pub batch_size: u64,
865    /// A per-source seed.
866    pub seed: u64,
867    /// Whether or not to include the offset in the value. The string is the column name.
868    pub include_offset: Option<String>,
869}
870
871impl KeyValueLoadGenerator {
872    /// The number of transactional snapshot rounds.
873    pub fn transactional_snapshot_rounds(&self) -> u64 {
874        if self.transactional_snapshot {
875            self.snapshot_rounds
876        } else {
877            0
878        }
879    }
880
881    /// The number of non-transactional snapshot rounds.
882    pub fn non_transactional_snapshot_rounds(&self) -> u64 {
883        if self.transactional_snapshot {
884            0
885        } else {
886            self.snapshot_rounds
887        }
888    }
889}
890
891impl RustType<ProtoKeyValueLoadGenerator> for KeyValueLoadGenerator {
892    fn into_proto(&self) -> ProtoKeyValueLoadGenerator {
893        ProtoKeyValueLoadGenerator {
894            keys: self.keys,
895            snapshot_rounds: self.snapshot_rounds,
896            transactional_snapshot: self.transactional_snapshot,
897            value_size: self.value_size,
898            partitions: self.partitions,
899            tick_interval: self.tick_interval.into_proto(),
900            batch_size: self.batch_size,
901            seed: self.seed,
902            include_offset: self.include_offset.clone(),
903        }
904    }
905
906    fn from_proto(proto: ProtoKeyValueLoadGenerator) -> Result<Self, TryFromProtoError> {
907        Ok(Self {
908            keys: proto.keys,
909            snapshot_rounds: proto.snapshot_rounds,
910            transactional_snapshot: proto.transactional_snapshot,
911            value_size: proto.value_size,
912            partitions: proto.partitions,
913            tick_interval: proto.tick_interval.into_rust()?,
914            batch_size: proto.batch_size,
915            seed: proto.seed,
916            include_offset: proto.include_offset,
917        })
918    }
919}