mz_compute_types/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for describing dataflow sinks.
11
12use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
13use mz_repr::refresh_schedule::RefreshSchedule;
14use mz_repr::{CatalogItemId, GlobalId, RelationDesc, Timestamp};
15use mz_storage_types::connections::aws::AwsConnection;
16use mz_storage_types::controller::CollectionMetadata;
17use mz_storage_types::sinks::S3UploadInfo;
18use proptest::prelude::{Arbitrary, BoxedStrategy, Strategy, any};
19use proptest_derive::Arbitrary;
20use serde::{Deserialize, Serialize};
21use timely::progress::Antichain;
22
23include!(concat!(env!("OUT_DIR"), "/mz_compute_types.sinks.rs"));
24
25/// A sink for updates to a relational collection.
26#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
27pub struct ComputeSinkDesc<S: 'static = (), T = Timestamp> {
28    /// TODO(database-issues#7533): Add documentation.
29    pub from: GlobalId,
30    /// TODO(database-issues#7533): Add documentation.
31    pub from_desc: RelationDesc,
32    /// TODO(database-issues#7533): Add documentation.
33    pub connection: ComputeSinkConnection<S>,
34    /// TODO(database-issues#7533): Add documentation.
35    pub with_snapshot: bool,
36    /// TODO(database-issues#7533): Add documentation.
37    pub up_to: Antichain<T>,
38    /// TODO(database-issues#7533): Add documentation.
39    pub non_null_assertions: Vec<usize>,
40    /// TODO(database-issues#7533): Add documentation.
41    pub refresh_schedule: Option<RefreshSchedule>,
42}
43
44impl Arbitrary for ComputeSinkDesc<CollectionMetadata, Timestamp> {
45    type Strategy = BoxedStrategy<Self>;
46    type Parameters = ();
47
48    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
49        (
50            any::<GlobalId>(),
51            any::<RelationDesc>(),
52            any::<ComputeSinkConnection<CollectionMetadata>>(),
53            any::<bool>(),
54            proptest::collection::vec(any::<Timestamp>(), 1..4),
55            proptest::collection::vec(any::<usize>(), 0..4),
56            proptest::option::of(any::<RefreshSchedule>()),
57        )
58            .prop_map(
59                |(
60                    from,
61                    from_desc,
62                    connection,
63                    with_snapshot,
64                    up_to_frontier,
65                    non_null_assertions,
66                    refresh_schedule,
67                )| {
68                    ComputeSinkDesc {
69                        from,
70                        from_desc,
71                        connection,
72                        with_snapshot,
73                        up_to: Antichain::from(up_to_frontier),
74                        non_null_assertions,
75                        refresh_schedule,
76                    }
77                },
78            )
79            .boxed()
80    }
81}
82
83impl Arbitrary for ComputeSinkDesc<(), Timestamp> {
84    type Strategy = BoxedStrategy<Self>;
85    type Parameters = ();
86
87    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
88        (
89            any::<GlobalId>(),
90            any::<RelationDesc>(),
91            any::<ComputeSinkConnection<()>>(),
92            any::<bool>(),
93            proptest::collection::vec(any::<Timestamp>(), 1..4),
94            proptest::collection::vec(any::<usize>(), 0..4),
95            proptest::option::of(any::<RefreshSchedule>()),
96        )
97            .prop_map(
98                |(
99                    from,
100                    from_desc,
101                    connection,
102                    with_snapshot,
103                    up_to_frontier,
104                    non_null_assertions,
105                    refresh_schedule,
106                )| {
107                    ComputeSinkDesc {
108                        from,
109                        from_desc,
110                        connection,
111                        with_snapshot,
112                        up_to: Antichain::from(up_to_frontier),
113                        non_null_assertions,
114                        refresh_schedule,
115                    }
116                },
117            )
118            .boxed()
119    }
120}
121
122impl RustType<ProtoComputeSinkDesc> for ComputeSinkDesc<CollectionMetadata, Timestamp> {
123    fn into_proto(&self) -> ProtoComputeSinkDesc {
124        ProtoComputeSinkDesc {
125            connection: Some(self.connection.into_proto()),
126            from: Some(self.from.into_proto()),
127            from_desc: Some(self.from_desc.into_proto()),
128            with_snapshot: self.with_snapshot,
129            up_to: Some(self.up_to.into_proto()),
130            non_null_assertions: self.non_null_assertions.into_proto(),
131            refresh_schedule: self.refresh_schedule.into_proto(),
132        }
133    }
134
135    fn from_proto(proto: ProtoComputeSinkDesc) -> Result<Self, TryFromProtoError> {
136        Ok(ComputeSinkDesc {
137            from: proto.from.into_rust_if_some("ProtoComputeSinkDesc::from")?,
138            from_desc: proto
139                .from_desc
140                .into_rust_if_some("ProtoComputeSinkDesc::from_desc")?,
141            connection: proto
142                .connection
143                .into_rust_if_some("ProtoComputeSinkDesc::connection")?,
144            with_snapshot: proto.with_snapshot,
145            up_to: proto
146                .up_to
147                .into_rust_if_some("ProtoComputeSinkDesc::up_to")?,
148            non_null_assertions: proto.non_null_assertions.into_rust()?,
149            refresh_schedule: proto.refresh_schedule.into_rust()?,
150        })
151    }
152}
153
154/// TODO(database-issues#7533): Add documentation.
155#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
156pub enum ComputeSinkConnection<S: 'static = ()> {
157    /// TODO(database-issues#7533): Add documentation.
158    Subscribe(SubscribeSinkConnection),
159    /// TODO(database-issues#7533): Add documentation.
160    MaterializedView(MaterializedViewSinkConnection<S>),
161    /// ContinualTask-specific information necessary for rendering a
162    /// ContinualTask sink.
163    ContinualTask(ContinualTaskConnection<S>),
164    /// A compute sink to do a oneshot copy to s3.
165    CopyToS3Oneshot(CopyToS3OneshotSinkConnection),
166}
167
168impl<S> ComputeSinkConnection<S> {
169    /// Returns the name of the sink connection.
170    pub fn name(&self) -> &'static str {
171        match self {
172            ComputeSinkConnection::Subscribe(_) => "subscribe",
173            ComputeSinkConnection::MaterializedView(_) => "materialized_view",
174            ComputeSinkConnection::ContinualTask(_) => "continual_task",
175            ComputeSinkConnection::CopyToS3Oneshot(_) => "copy_to_s3_oneshot",
176        }
177    }
178
179    /// True if the sink is a subscribe, which is differently recoverable than other sinks.
180    pub fn is_subscribe(&self) -> bool {
181        if let ComputeSinkConnection::Subscribe(_) = self {
182            true
183        } else {
184            false
185        }
186    }
187}
188
189impl RustType<ProtoComputeSinkConnection> for ComputeSinkConnection<CollectionMetadata> {
190    fn into_proto(&self) -> ProtoComputeSinkConnection {
191        use proto_compute_sink_connection::Kind;
192        ProtoComputeSinkConnection {
193            kind: Some(match self {
194                ComputeSinkConnection::Subscribe(_) => Kind::Subscribe(()),
195                ComputeSinkConnection::MaterializedView(materialized_view) => {
196                    Kind::MaterializedView(materialized_view.into_proto())
197                }
198                ComputeSinkConnection::ContinualTask(continual_task) => {
199                    Kind::ContinualTask(continual_task.into_proto())
200                }
201                ComputeSinkConnection::CopyToS3Oneshot(s3) => {
202                    Kind::CopyToS3Oneshot(s3.into_proto())
203                }
204            }),
205        }
206    }
207
208    fn from_proto(proto: ProtoComputeSinkConnection) -> Result<Self, TryFromProtoError> {
209        use proto_compute_sink_connection::Kind;
210        let kind = proto
211            .kind
212            .ok_or_else(|| TryFromProtoError::missing_field("ProtoComputeSinkConnection::kind"))?;
213        Ok(match kind {
214            Kind::Subscribe(_) => ComputeSinkConnection::Subscribe(SubscribeSinkConnection {}),
215            Kind::MaterializedView(materialized_view) => {
216                ComputeSinkConnection::MaterializedView(materialized_view.into_rust()?)
217            }
218            Kind::ContinualTask(continual_task) => {
219                ComputeSinkConnection::ContinualTask(continual_task.into_rust()?)
220            }
221            Kind::CopyToS3Oneshot(s3) => ComputeSinkConnection::CopyToS3Oneshot(s3.into_rust()?),
222        })
223    }
224}
225
226/// TODO(database-issues#7533): Add documentation.
227#[derive(Arbitrary, Default, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
228pub struct SubscribeSinkConnection {}
229
230/// Connection attributes required to do a oneshot copy to s3.
231#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
232pub struct CopyToS3OneshotSinkConnection {
233    /// Information specific to the upload.
234    pub upload_info: S3UploadInfo,
235    /// The AWS connection information to do the writes.
236    pub aws_connection: AwsConnection,
237    /// The ID of the Connection object, used to generate the External ID when
238    /// using AssumeRole with AWS connection.
239    pub connection_id: CatalogItemId,
240    /// The number of batches the COPY TO output will be divided into
241    /// where each worker will process 0 or more batches of data.
242    pub output_batch_count: u64,
243}
244
245impl RustType<ProtoCopyToS3OneshotSinkConnection> for CopyToS3OneshotSinkConnection {
246    fn into_proto(&self) -> ProtoCopyToS3OneshotSinkConnection {
247        ProtoCopyToS3OneshotSinkConnection {
248            upload_info: Some(self.upload_info.into_proto()),
249            aws_connection: Some(self.aws_connection.into_proto()),
250            connection_id: Some(self.connection_id.into_proto()),
251            output_batch_count: self.output_batch_count,
252        }
253    }
254
255    fn from_proto(proto: ProtoCopyToS3OneshotSinkConnection) -> Result<Self, TryFromProtoError> {
256        Ok(CopyToS3OneshotSinkConnection {
257            upload_info: proto
258                .upload_info
259                .into_rust_if_some("ProtoCopyToS3OneshotSinkConnection::upload_info")?,
260            aws_connection: proto
261                .aws_connection
262                .into_rust_if_some("ProtoCopyToS3OneshotSinkConnection::aws_connection")?,
263            connection_id: proto
264                .connection_id
265                .into_rust_if_some("ProtoCopyToS3OneshotSinkConnection::connection_id")?,
266            output_batch_count: proto.output_batch_count,
267        })
268    }
269}
270
271/// TODO(database-issues#7533): Add documentation.
272#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
273pub struct MaterializedViewSinkConnection<S> {
274    /// TODO(database-issues#7533): Add documentation.
275    pub value_desc: RelationDesc,
276    /// TODO(database-issues#7533): Add documentation.
277    pub storage_metadata: S,
278}
279
280impl RustType<ProtoMaterializedViewSinkConnection>
281    for MaterializedViewSinkConnection<CollectionMetadata>
282{
283    fn into_proto(&self) -> ProtoMaterializedViewSinkConnection {
284        ProtoMaterializedViewSinkConnection {
285            value_desc: Some(self.value_desc.into_proto()),
286            storage_metadata: Some(self.storage_metadata.into_proto()),
287        }
288    }
289
290    fn from_proto(proto: ProtoMaterializedViewSinkConnection) -> Result<Self, TryFromProtoError> {
291        Ok(MaterializedViewSinkConnection {
292            value_desc: proto
293                .value_desc
294                .into_rust_if_some("ProtoMaterializedViewSinkConnection::value_desc")?,
295            storage_metadata: proto
296                .storage_metadata
297                .into_rust_if_some("ProtoMaterializedViewSinkConnection::storage_metadata")?,
298        })
299    }
300}
301
302/// ContinualTask-specific information necessary for rendering a ContinualTask
303/// sink. (Shared-sink information is instead stored on ComputeSinkConnection.)
304#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
305pub struct ContinualTaskConnection<S> {
306    /// The id of the (for now) single input to this CT.
307    //
308    // TODO(ct3): This can be removed once we render the "input" sources without
309    // the hack.
310    pub input_id: GlobalId,
311    /// The necessary storage information for writing to the output collection.
312    pub storage_metadata: S,
313}
314
315impl RustType<ProtoContinualTaskConnection> for ContinualTaskConnection<CollectionMetadata> {
316    fn into_proto(&self) -> ProtoContinualTaskConnection {
317        ProtoContinualTaskConnection {
318            input_id: Some(self.input_id.into_proto()),
319            storage_metadata: Some(self.storage_metadata.into_proto()),
320        }
321    }
322
323    fn from_proto(proto: ProtoContinualTaskConnection) -> Result<Self, TryFromProtoError> {
324        Ok(ContinualTaskConnection {
325            input_id: proto
326                .input_id
327                .into_rust_if_some("ProtoContinualTaskConnection::input_id")?,
328            storage_metadata: proto
329                .storage_metadata
330                .into_rust_if_some("ProtoContinualTaskConnection::output_metadata")?,
331        })
332    }
333}