Skip to main content

mz_compute_types/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types for describing dataflow sinks.
11
12use mz_expr::ColumnOrder;
13use mz_repr::refresh_schedule::RefreshSchedule;
14use mz_repr::{CatalogItemId, GlobalId, RelationDesc, Timestamp};
15use mz_storage_types::connections::aws::AwsConnection;
16use mz_storage_types::sinks::S3UploadInfo;
17use serde::{Deserialize, Serialize};
18use timely::progress::Antichain;
19
20/// A sink for updates to a relational collection.
21#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
22pub struct ComputeSinkDesc<S: 'static = (), T = Timestamp> {
23    /// TODO(database-issues#7533): Add documentation.
24    pub from: GlobalId,
25    /// TODO(database-issues#7533): Add documentation.
26    pub from_desc: RelationDesc,
27    /// TODO(database-issues#7533): Add documentation.
28    pub connection: ComputeSinkConnection<S>,
29    /// TODO(database-issues#7533): Add documentation.
30    pub with_snapshot: bool,
31    /// TODO(database-issues#7533): Add documentation.
32    pub up_to: Antichain<T>,
33    /// TODO(database-issues#7533): Add documentation.
34    pub non_null_assertions: Vec<usize>,
35    /// TODO(database-issues#7533): Add documentation.
36    pub refresh_schedule: Option<RefreshSchedule>,
37}
38
39/// TODO(database-issues#7533): Add documentation.
40#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
41pub enum ComputeSinkConnection<S: 'static = ()> {
42    /// TODO(database-issues#7533): Add documentation.
43    Subscribe(SubscribeSinkConnection),
44    /// TODO(database-issues#7533): Add documentation.
45    MaterializedView(MaterializedViewSinkConnection<S>),
46    /// ContinualTask-specific information necessary for rendering a
47    /// ContinualTask sink.
48    ContinualTask(ContinualTaskConnection<S>),
49    /// A compute sink to do a oneshot copy to s3.
50    CopyToS3Oneshot(CopyToS3OneshotSinkConnection),
51}
52
53impl<S> ComputeSinkConnection<S> {
54    /// Returns the name of the sink connection.
55    pub fn name(&self) -> &'static str {
56        match self {
57            ComputeSinkConnection::Subscribe(_) => "subscribe",
58            ComputeSinkConnection::MaterializedView(_) => "materialized_view",
59            ComputeSinkConnection::ContinualTask(_) => "continual_task",
60            ComputeSinkConnection::CopyToS3Oneshot(_) => "copy_to_s3_oneshot",
61        }
62    }
63
64    /// True if the sink is a subscribe, which is differently recoverable than other sinks.
65    pub fn is_subscribe(&self) -> bool {
66        if let ComputeSinkConnection::Subscribe(_) = self {
67            true
68        } else {
69            false
70        }
71    }
72}
73
74/// TODO(database-issues#7533): Add documentation.
75#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
76pub struct SubscribeSinkConnection {
77    /// An ordering for the data in the subscribe.
78    pub output: Vec<ColumnOrder>,
79}
80
81/// Connection attributes required to do a oneshot copy to s3.
82#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
83pub struct CopyToS3OneshotSinkConnection {
84    /// Information specific to the upload.
85    pub upload_info: S3UploadInfo,
86    /// The AWS connection information to do the writes.
87    pub aws_connection: AwsConnection,
88    /// The ID of the Connection object, used to generate the External ID when
89    /// using AssumeRole with AWS connection.
90    pub connection_id: CatalogItemId,
91    /// The number of batches the COPY TO output will be divided into
92    /// where each worker will process 0 or more batches of data.
93    pub output_batch_count: u64,
94}
95
96/// TODO(database-issues#7533): Add documentation.
97#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
98pub struct MaterializedViewSinkConnection<S> {
99    /// TODO(database-issues#7533): Add documentation.
100    pub value_desc: RelationDesc,
101    /// TODO(database-issues#7533): Add documentation.
102    pub storage_metadata: S,
103}
104
105/// ContinualTask-specific information necessary for rendering a ContinualTask
106/// sink. (Shared-sink information is instead stored on ComputeSinkConnection.)
107#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
108pub struct ContinualTaskConnection<S> {
109    /// The id of the (for now) single input to this CT.
110    //
111    // TODO(ct3): This can be removed once we render the "input" sources without
112    // the hack.
113    pub input_id: GlobalId,
114    /// The necessary storage information for writing to the output collection.
115    pub storage_metadata: S,
116}