Skip to main content

mz_compute/sink/
copy_to_s3_oneshot.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::any::Any;
11use std::cell::RefCell;
12use std::rc::Rc;
13
14use differential_dataflow::{Hashable, VecCollection};
15use mz_compute_client::protocol::response::CopyToResponse;
16use mz_compute_types::dyncfgs::{
17    COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO, COPY_TO_S3_MULTIPART_PART_SIZE_BYTES,
18    COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO,
19};
20use mz_compute_types::sinks::{ComputeSinkDesc, CopyToS3OneshotSinkConnection};
21use mz_repr::{Diff, GlobalId, Row, Timestamp};
22use mz_storage_types::controller::CollectionMetadata;
23use mz_timely_util::operator::consolidate_pact;
24use mz_timely_util::probe::{Handle, ProbeNotify};
25use timely::dataflow::channels::pact::{Exchange, Pipeline};
26use timely::dataflow::operators::Operator;
27use timely::progress::Antichain;
28
29use crate::render::StartSignal;
30use crate::render::errors::DataflowErrorSer;
31use crate::render::sinks::SinkRender;
32use crate::typedefs::KeyBatcher;
33
34impl<'scope> SinkRender<'scope> for CopyToS3OneshotSinkConnection {
35    fn render_sink(
36        &self,
37        compute_state: &mut crate::compute_state::ComputeState,
38        sink: &ComputeSinkDesc<CollectionMetadata>,
39        sink_id: GlobalId,
40        _as_of: Antichain<Timestamp>,
41        _start_signal: StartSignal,
42        sinked_collection: VecCollection<'scope, Timestamp, Row, Diff>,
43        err_collection: VecCollection<'scope, Timestamp, DataflowErrorSer, Diff>,
44        output_probe: &Handle<Timestamp>,
45    ) -> Option<Rc<dyn Any>> {
46        // Set up a callback to communicate the result of the copy-to operation to the controller.
47        let mut response_protocol = ResponseProtocol {
48            sink_id,
49            response_buffer: Some(Rc::clone(&compute_state.copy_to_response_buffer)),
50        };
51        let result_callback = move |count: Result<u64, String>| {
52            response_protocol.send(count);
53        };
54
55        // Splitting the data across a known number of batches to distribute load across the cluster.
56        // Each worker will be handling data belonging to 0 or more batches. We are doing this so that
57        // we can write files to s3 deterministically across different replicas of different sizes
58        // using the batch ID. Each worker will split a batch's data into 1 or more
59        // files based on the user provided `MAX_FILE_SIZE`.
60        let batch_count = self.output_batch_count;
61
62        // We exchange the data according to batch, but we don't want to send the batch ID to the
63        // sink. The sink can re-compute the batch ID from the data.
64        let input = consolidate_pact::<KeyBatcher<_, _, _>, _>(
65            sinked_collection.map(move |row| (row, ())).inner,
66            Exchange::new(move |((row, ()), _, _): &((Row, _), _, _)| row.hashed() % batch_count),
67            "Consolidated COPY TO S3 input",
68        )
69        .probe_notify_with(vec![output_probe.clone()]);
70
71        // We need to consolidate the error collection to ensure we don't act on retracted errors.
72        let error = consolidate_pact::<KeyBatcher<_, _, _>, _>(
73            err_collection.map(move |err| (err, ())).inner,
74            Exchange::new(move |((err, _), _, _): &((DataflowErrorSer, _), _, _)| {
75                err.hashed() % batch_count
76            }),
77            "Consolidated COPY TO S3 errors",
78        );
79
80        // We can only propagate the one error back to the client, so filter the error
81        // collection to the first error that is before the sink 'up_to' to avoid
82        // sending the full error collection to the next operator. We ensure we find the
83        // first error before the 'up_to' to avoid accidentally sending an irrelevant error.
84        let error_stream =
85            error.unary_frontier(Pipeline, "COPY TO S3 error filtering", |_cap, _info| {
86                let up_to = sink.up_to.clone();
87                let mut received_one = false;
88                move |(input, _), output| {
89                    input.for_each_time(|time, data| {
90                        if !up_to.less_equal(time.time()) && !received_one {
91                            received_one = true;
92                            output.session(&time).give_iterator(
93                                data.flatten()
94                                    .flatten()
95                                    .flat_map(|chunk| chunk.iter().cloned())
96                                    .next()
97                                    .map(|((err, ()), time, diff)| (err, time, diff))
98                                    .into_iter(),
99                            );
100                        }
101                    });
102                }
103            });
104
105        let params = mz_storage_operators::s3_oneshot_sink::CopyToParameters {
106            parquet_row_group_ratio: COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO
107                .get(&compute_state.worker_config),
108            arrow_builder_buffer_ratio: COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO
109                .get(&compute_state.worker_config),
110            s3_multipart_part_size_bytes: COPY_TO_S3_MULTIPART_PART_SIZE_BYTES
111                .get(&compute_state.worker_config),
112        };
113
114        // Deserialize DataflowErrorSer back to DataflowError at the boundary
115        // with storage-operators, which expects DataflowError.
116        use timely::dataflow::operators::vec::Map;
117        let error_stream = error_stream.map(|(err, time, diff)| (err.deserialize(), time, diff));
118
119        let enforce_external_addresses =
120            mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES.get(&compute_state.worker_config);
121
122        let token = mz_storage_operators::s3_oneshot_sink::copy_to(
123            input,
124            error_stream,
125            sink.up_to.clone(),
126            self.upload_info.clone(),
127            compute_state.context.connection_context.clone(),
128            self.aws_connection.clone(),
129            sink_id,
130            self.connection_id,
131            params,
132            result_callback,
133            self.output_batch_count,
134            enforce_external_addresses,
135        );
136
137        Some(token)
138    }
139}
140
141/// A type that guides the transmission of number of rows back to the coordinator.
142struct ResponseProtocol {
143    pub sink_id: GlobalId,
144    pub response_buffer: Option<Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>>,
145}
146
147impl ResponseProtocol {
148    // This method should only be called once otherwise this will panic.
149    fn send(&mut self, count: Result<u64, String>) {
150        // The dataflow's input has been exhausted, clear the channel,
151        // to avoid sending `CopyToResponse::Dropped`.
152        let buffer = self.response_buffer.take().expect("expect response buffer");
153        let response = match count {
154            Ok(count) => CopyToResponse::RowCount(count),
155            Err(error) => CopyToResponse::Error(error),
156        };
157        buffer.borrow_mut().push((self.sink_id, response));
158    }
159}
160
161impl Drop for ResponseProtocol {
162    fn drop(&mut self) {
163        if let Some(buffer) = self.response_buffer.take() {
164            buffer
165                .borrow_mut()
166                .push((self.sink_id, CopyToResponse::Dropped));
167        }
168    }
169}