1use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
13use mz_repr::refresh_schedule::RefreshSchedule;
14use mz_repr::{CatalogItemId, GlobalId, RelationDesc, Timestamp};
15use mz_storage_types::connections::aws::AwsConnection;
16use mz_storage_types::controller::CollectionMetadata;
17use mz_storage_types::sinks::S3UploadInfo;
18use proptest::prelude::{Arbitrary, BoxedStrategy, Strategy, any};
19use proptest_derive::Arbitrary;
20use serde::{Deserialize, Serialize};
21use timely::progress::Antichain;
22
23include!(concat!(env!("OUT_DIR"), "/mz_compute_types.sinks.rs"));
24
25#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
27pub struct ComputeSinkDesc<S: 'static = (), T = Timestamp> {
28 pub from: GlobalId,
30 pub from_desc: RelationDesc,
32 pub connection: ComputeSinkConnection<S>,
34 pub with_snapshot: bool,
36 pub up_to: Antichain<T>,
38 pub non_null_assertions: Vec<usize>,
40 pub refresh_schedule: Option<RefreshSchedule>,
42}
43
44impl Arbitrary for ComputeSinkDesc<CollectionMetadata, Timestamp> {
45 type Strategy = BoxedStrategy<Self>;
46 type Parameters = ();
47
48 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
49 (
50 any::<GlobalId>(),
51 any::<RelationDesc>(),
52 any::<ComputeSinkConnection<CollectionMetadata>>(),
53 any::<bool>(),
54 proptest::collection::vec(any::<Timestamp>(), 1..4),
55 proptest::collection::vec(any::<usize>(), 0..4),
56 proptest::option::of(any::<RefreshSchedule>()),
57 )
58 .prop_map(
59 |(
60 from,
61 from_desc,
62 connection,
63 with_snapshot,
64 up_to_frontier,
65 non_null_assertions,
66 refresh_schedule,
67 )| {
68 ComputeSinkDesc {
69 from,
70 from_desc,
71 connection,
72 with_snapshot,
73 up_to: Antichain::from(up_to_frontier),
74 non_null_assertions,
75 refresh_schedule,
76 }
77 },
78 )
79 .boxed()
80 }
81}
82
83impl Arbitrary for ComputeSinkDesc<(), Timestamp> {
84 type Strategy = BoxedStrategy<Self>;
85 type Parameters = ();
86
87 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
88 (
89 any::<GlobalId>(),
90 any::<RelationDesc>(),
91 any::<ComputeSinkConnection<()>>(),
92 any::<bool>(),
93 proptest::collection::vec(any::<Timestamp>(), 1..4),
94 proptest::collection::vec(any::<usize>(), 0..4),
95 proptest::option::of(any::<RefreshSchedule>()),
96 )
97 .prop_map(
98 |(
99 from,
100 from_desc,
101 connection,
102 with_snapshot,
103 up_to_frontier,
104 non_null_assertions,
105 refresh_schedule,
106 )| {
107 ComputeSinkDesc {
108 from,
109 from_desc,
110 connection,
111 with_snapshot,
112 up_to: Antichain::from(up_to_frontier),
113 non_null_assertions,
114 refresh_schedule,
115 }
116 },
117 )
118 .boxed()
119 }
120}
121
122impl RustType<ProtoComputeSinkDesc> for ComputeSinkDesc<CollectionMetadata, Timestamp> {
123 fn into_proto(&self) -> ProtoComputeSinkDesc {
124 ProtoComputeSinkDesc {
125 connection: Some(self.connection.into_proto()),
126 from: Some(self.from.into_proto()),
127 from_desc: Some(self.from_desc.into_proto()),
128 with_snapshot: self.with_snapshot,
129 up_to: Some(self.up_to.into_proto()),
130 non_null_assertions: self.non_null_assertions.into_proto(),
131 refresh_schedule: self.refresh_schedule.into_proto(),
132 }
133 }
134
135 fn from_proto(proto: ProtoComputeSinkDesc) -> Result<Self, TryFromProtoError> {
136 Ok(ComputeSinkDesc {
137 from: proto.from.into_rust_if_some("ProtoComputeSinkDesc::from")?,
138 from_desc: proto
139 .from_desc
140 .into_rust_if_some("ProtoComputeSinkDesc::from_desc")?,
141 connection: proto
142 .connection
143 .into_rust_if_some("ProtoComputeSinkDesc::connection")?,
144 with_snapshot: proto.with_snapshot,
145 up_to: proto
146 .up_to
147 .into_rust_if_some("ProtoComputeSinkDesc::up_to")?,
148 non_null_assertions: proto.non_null_assertions.into_rust()?,
149 refresh_schedule: proto.refresh_schedule.into_rust()?,
150 })
151 }
152}
153
154#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
156pub enum ComputeSinkConnection<S: 'static = ()> {
157 Subscribe(SubscribeSinkConnection),
159 MaterializedView(MaterializedViewSinkConnection<S>),
161 ContinualTask(ContinualTaskConnection<S>),
164 CopyToS3Oneshot(CopyToS3OneshotSinkConnection),
166}
167
168impl<S> ComputeSinkConnection<S> {
169 pub fn name(&self) -> &'static str {
171 match self {
172 ComputeSinkConnection::Subscribe(_) => "subscribe",
173 ComputeSinkConnection::MaterializedView(_) => "materialized_view",
174 ComputeSinkConnection::ContinualTask(_) => "continual_task",
175 ComputeSinkConnection::CopyToS3Oneshot(_) => "copy_to_s3_oneshot",
176 }
177 }
178
179 pub fn is_subscribe(&self) -> bool {
181 if let ComputeSinkConnection::Subscribe(_) = self {
182 true
183 } else {
184 false
185 }
186 }
187}
188
189impl RustType<ProtoComputeSinkConnection> for ComputeSinkConnection<CollectionMetadata> {
190 fn into_proto(&self) -> ProtoComputeSinkConnection {
191 use proto_compute_sink_connection::Kind;
192 ProtoComputeSinkConnection {
193 kind: Some(match self {
194 ComputeSinkConnection::Subscribe(_) => Kind::Subscribe(()),
195 ComputeSinkConnection::MaterializedView(materialized_view) => {
196 Kind::MaterializedView(materialized_view.into_proto())
197 }
198 ComputeSinkConnection::ContinualTask(continual_task) => {
199 Kind::ContinualTask(continual_task.into_proto())
200 }
201 ComputeSinkConnection::CopyToS3Oneshot(s3) => {
202 Kind::CopyToS3Oneshot(s3.into_proto())
203 }
204 }),
205 }
206 }
207
208 fn from_proto(proto: ProtoComputeSinkConnection) -> Result<Self, TryFromProtoError> {
209 use proto_compute_sink_connection::Kind;
210 let kind = proto
211 .kind
212 .ok_or_else(|| TryFromProtoError::missing_field("ProtoComputeSinkConnection::kind"))?;
213 Ok(match kind {
214 Kind::Subscribe(_) => ComputeSinkConnection::Subscribe(SubscribeSinkConnection {}),
215 Kind::MaterializedView(materialized_view) => {
216 ComputeSinkConnection::MaterializedView(materialized_view.into_rust()?)
217 }
218 Kind::ContinualTask(continual_task) => {
219 ComputeSinkConnection::ContinualTask(continual_task.into_rust()?)
220 }
221 Kind::CopyToS3Oneshot(s3) => ComputeSinkConnection::CopyToS3Oneshot(s3.into_rust()?),
222 })
223 }
224}
225
226#[derive(Arbitrary, Default, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
228pub struct SubscribeSinkConnection {}
229
230#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
232pub struct CopyToS3OneshotSinkConnection {
233 pub upload_info: S3UploadInfo,
235 pub aws_connection: AwsConnection,
237 pub connection_id: CatalogItemId,
240 pub output_batch_count: u64,
243}
244
245impl RustType<ProtoCopyToS3OneshotSinkConnection> for CopyToS3OneshotSinkConnection {
246 fn into_proto(&self) -> ProtoCopyToS3OneshotSinkConnection {
247 ProtoCopyToS3OneshotSinkConnection {
248 upload_info: Some(self.upload_info.into_proto()),
249 aws_connection: Some(self.aws_connection.into_proto()),
250 connection_id: Some(self.connection_id.into_proto()),
251 output_batch_count: self.output_batch_count,
252 }
253 }
254
255 fn from_proto(proto: ProtoCopyToS3OneshotSinkConnection) -> Result<Self, TryFromProtoError> {
256 Ok(CopyToS3OneshotSinkConnection {
257 upload_info: proto
258 .upload_info
259 .into_rust_if_some("ProtoCopyToS3OneshotSinkConnection::upload_info")?,
260 aws_connection: proto
261 .aws_connection
262 .into_rust_if_some("ProtoCopyToS3OneshotSinkConnection::aws_connection")?,
263 connection_id: proto
264 .connection_id
265 .into_rust_if_some("ProtoCopyToS3OneshotSinkConnection::connection_id")?,
266 output_batch_count: proto.output_batch_count,
267 })
268 }
269}
270
271#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
273pub struct MaterializedViewSinkConnection<S> {
274 pub value_desc: RelationDesc,
276 pub storage_metadata: S,
278}
279
280impl RustType<ProtoMaterializedViewSinkConnection>
281 for MaterializedViewSinkConnection<CollectionMetadata>
282{
283 fn into_proto(&self) -> ProtoMaterializedViewSinkConnection {
284 ProtoMaterializedViewSinkConnection {
285 value_desc: Some(self.value_desc.into_proto()),
286 storage_metadata: Some(self.storage_metadata.into_proto()),
287 }
288 }
289
290 fn from_proto(proto: ProtoMaterializedViewSinkConnection) -> Result<Self, TryFromProtoError> {
291 Ok(MaterializedViewSinkConnection {
292 value_desc: proto
293 .value_desc
294 .into_rust_if_some("ProtoMaterializedViewSinkConnection::value_desc")?,
295 storage_metadata: proto
296 .storage_metadata
297 .into_rust_if_some("ProtoMaterializedViewSinkConnection::storage_metadata")?,
298 })
299 }
300}
301
302#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
305pub struct ContinualTaskConnection<S> {
306 pub input_id: GlobalId,
311 pub storage_metadata: S,
313}
314
315impl RustType<ProtoContinualTaskConnection> for ContinualTaskConnection<CollectionMetadata> {
316 fn into_proto(&self) -> ProtoContinualTaskConnection {
317 ProtoContinualTaskConnection {
318 input_id: Some(self.input_id.into_proto()),
319 storage_metadata: Some(self.storage_metadata.into_proto()),
320 }
321 }
322
323 fn from_proto(proto: ProtoContinualTaskConnection) -> Result<Self, TryFromProtoError> {
324 Ok(ContinualTaskConnection {
325 input_id: proto
326 .input_id
327 .into_rust_if_some("ProtoContinualTaskConnection::input_id")?,
328 storage_metadata: proto
329 .storage_metadata
330 .into_rust_if_some("ProtoContinualTaskConnection::output_metadata")?,
331 })
332 }
333}