mz_compute_types/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for describing dataflow sinks.
11
12use mz_repr::refresh_schedule::RefreshSchedule;
13use mz_repr::{CatalogItemId, GlobalId, RelationDesc, Timestamp};
14use mz_storage_types::connections::aws::AwsConnection;
15use mz_storage_types::sinks::S3UploadInfo;
16use serde::{Deserialize, Serialize};
17use timely::progress::Antichain;
18
19/// A sink for updates to a relational collection.
20#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
21pub struct ComputeSinkDesc<S: 'static = (), T = Timestamp> {
22    /// TODO(database-issues#7533): Add documentation.
23    pub from: GlobalId,
24    /// TODO(database-issues#7533): Add documentation.
25    pub from_desc: RelationDesc,
26    /// TODO(database-issues#7533): Add documentation.
27    pub connection: ComputeSinkConnection<S>,
28    /// TODO(database-issues#7533): Add documentation.
29    pub with_snapshot: bool,
30    /// TODO(database-issues#7533): Add documentation.
31    pub up_to: Antichain<T>,
32    /// TODO(database-issues#7533): Add documentation.
33    pub non_null_assertions: Vec<usize>,
34    /// TODO(database-issues#7533): Add documentation.
35    pub refresh_schedule: Option<RefreshSchedule>,
36}
37
38/// TODO(database-issues#7533): Add documentation.
39#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
40pub enum ComputeSinkConnection<S: 'static = ()> {
41    /// TODO(database-issues#7533): Add documentation.
42    Subscribe(SubscribeSinkConnection),
43    /// TODO(database-issues#7533): Add documentation.
44    MaterializedView(MaterializedViewSinkConnection<S>),
45    /// ContinualTask-specific information necessary for rendering a
46    /// ContinualTask sink.
47    ContinualTask(ContinualTaskConnection<S>),
48    /// A compute sink to do a oneshot copy to s3.
49    CopyToS3Oneshot(CopyToS3OneshotSinkConnection),
50}
51
52impl<S> ComputeSinkConnection<S> {
53    /// Returns the name of the sink connection.
54    pub fn name(&self) -> &'static str {
55        match self {
56            ComputeSinkConnection::Subscribe(_) => "subscribe",
57            ComputeSinkConnection::MaterializedView(_) => "materialized_view",
58            ComputeSinkConnection::ContinualTask(_) => "continual_task",
59            ComputeSinkConnection::CopyToS3Oneshot(_) => "copy_to_s3_oneshot",
60        }
61    }
62
63    /// True if the sink is a subscribe, which is differently recoverable than other sinks.
64    pub fn is_subscribe(&self) -> bool {
65        if let ComputeSinkConnection::Subscribe(_) = self {
66            true
67        } else {
68            false
69        }
70    }
71}
72
73/// TODO(database-issues#7533): Add documentation.
74#[derive(Default, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
75pub struct SubscribeSinkConnection {}
76
77/// Connection attributes required to do a oneshot copy to s3.
78#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
79pub struct CopyToS3OneshotSinkConnection {
80    /// Information specific to the upload.
81    pub upload_info: S3UploadInfo,
82    /// The AWS connection information to do the writes.
83    pub aws_connection: AwsConnection,
84    /// The ID of the Connection object, used to generate the External ID when
85    /// using AssumeRole with AWS connection.
86    pub connection_id: CatalogItemId,
87    /// The number of batches the COPY TO output will be divided into
88    /// where each worker will process 0 or more batches of data.
89    pub output_batch_count: u64,
90}
91
92/// TODO(database-issues#7533): Add documentation.
93#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
94pub struct MaterializedViewSinkConnection<S> {
95    /// TODO(database-issues#7533): Add documentation.
96    pub value_desc: RelationDesc,
97    /// TODO(database-issues#7533): Add documentation.
98    pub storage_metadata: S,
99}
100
101/// ContinualTask-specific information necessary for rendering a ContinualTask
102/// sink. (Shared-sink information is instead stored on ComputeSinkConnection.)
103#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
104pub struct ContinualTaskConnection<S> {
105    /// The id of the (for now) single input to this CT.
106    //
107    // TODO(ct3): This can be removed once we render the "input" sources without
108    // the hack.
109    pub input_id: GlobalId,
110    /// The necessary storage information for writing to the output collection.
111    pub storage_metadata: S,
112}