Skip to main content

mz_compute/sink/
copy_to_s3_oneshot.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::any::Any;
11use std::cell::RefCell;
12use std::rc::Rc;
13
14use differential_dataflow::{Hashable, VecCollection};
15use mz_compute_client::protocol::response::CopyToResponse;
16use mz_compute_types::dyncfgs::{
17    COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO, COPY_TO_S3_MULTIPART_PART_SIZE_BYTES,
18    COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO,
19};
20use mz_compute_types::sinks::{ComputeSinkDesc, CopyToS3OneshotSinkConnection};
21use mz_repr::{Diff, GlobalId, Row, Timestamp};
22use mz_storage_types::controller::CollectionMetadata;
23use mz_storage_types::errors::DataflowError;
24use mz_timely_util::operator::consolidate_pact;
25use mz_timely_util::probe::{Handle, ProbeNotify};
26use timely::dataflow::channels::pact::{Exchange, Pipeline};
27use timely::dataflow::operators::Operator;
28use timely::progress::Antichain;
29
30use crate::render::StartSignal;
31use crate::render::sinks::SinkRender;
32use crate::typedefs::KeyBatcher;
33
34impl<'scope> SinkRender<'scope> for CopyToS3OneshotSinkConnection {
35    fn render_sink(
36        &self,
37        compute_state: &mut crate::compute_state::ComputeState,
38        sink: &ComputeSinkDesc<CollectionMetadata>,
39        sink_id: GlobalId,
40        _as_of: Antichain<Timestamp>,
41        _start_signal: StartSignal,
42        sinked_collection: VecCollection<'scope, Timestamp, Row, Diff>,
43        err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
44        _ct_times: Option<VecCollection<'scope, Timestamp, (), Diff>>,
45        output_probe: &Handle<Timestamp>,
46    ) -> Option<Rc<dyn Any>> {
47        // Set up a callback to communicate the result of the copy-to operation to the controller.
48        let mut response_protocol = ResponseProtocol {
49            sink_id,
50            response_buffer: Some(Rc::clone(&compute_state.copy_to_response_buffer)),
51        };
52        let result_callback = move |count: Result<u64, String>| {
53            response_protocol.send(count);
54        };
55
56        // Splitting the data across a known number of batches to distribute load across the cluster.
57        // Each worker will be handling data belonging to 0 or more batches. We are doing this so that
58        // we can write files to s3 deterministically across different replicas of different sizes
59        // using the batch ID. Each worker will split a batch's data into 1 or more
60        // files based on the user provided `MAX_FILE_SIZE`.
61        let batch_count = self.output_batch_count;
62
63        // We exchange the data according to batch, but we don't want to send the batch ID to the
64        // sink. The sink can re-compute the batch ID from the data.
65        let input = consolidate_pact::<KeyBatcher<_, _, _>, _>(
66            sinked_collection.map(move |row| (row, ())).inner,
67            Exchange::new(move |((row, ()), _, _): &((Row, _), _, _)| row.hashed() % batch_count),
68            "Consolidated COPY TO S3 input",
69        )
70        .probe_notify_with(vec![output_probe.clone()]);
71
72        // We need to consolidate the error collection to ensure we don't act on retracted errors.
73        let error = consolidate_pact::<KeyBatcher<_, _, _>, _>(
74            err_collection.map(move |err| (err, ())).inner,
75            Exchange::new(move |((err, _), _, _): &((DataflowError, _), _, _)| {
76                err.hashed() % batch_count
77            }),
78            "Consolidated COPY TO S3 errors",
79        );
80
81        // We can only propagate the one error back to the client, so filter the error
82        // collection to the first error that is before the sink 'up_to' to avoid
83        // sending the full error collection to the next operator. We ensure we find the
84        // first error before the 'up_to' to avoid accidentally sending an irrelevant error.
85        let error_stream =
86            error.unary_frontier(Pipeline, "COPY TO S3 error filtering", |_cap, _info| {
87                let up_to = sink.up_to.clone();
88                let mut received_one = false;
89                move |(input, _), output| {
90                    input.for_each_time(|time, data| {
91                        if !up_to.less_equal(time.time()) && !received_one {
92                            received_one = true;
93                            output.session(&time).give_iterator(
94                                data.flatten()
95                                    .flatten()
96                                    .flat_map(|chunk| chunk.iter().cloned())
97                                    .next()
98                                    .map(|((err, ()), time, diff)| (err, time, diff))
99                                    .into_iter(),
100                            );
101                        }
102                    });
103                }
104            });
105
106        let params = mz_storage_operators::s3_oneshot_sink::CopyToParameters {
107            parquet_row_group_ratio: COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO
108                .get(&compute_state.worker_config),
109            arrow_builder_buffer_ratio: COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO
110                .get(&compute_state.worker_config),
111            s3_multipart_part_size_bytes: COPY_TO_S3_MULTIPART_PART_SIZE_BYTES
112                .get(&compute_state.worker_config),
113        };
114
115        let token = mz_storage_operators::s3_oneshot_sink::copy_to(
116            input,
117            error_stream,
118            sink.up_to.clone(),
119            self.upload_info.clone(),
120            compute_state.context.connection_context.clone(),
121            self.aws_connection.clone(),
122            sink_id,
123            self.connection_id,
124            params,
125            result_callback,
126            self.output_batch_count,
127        );
128
129        Some(token)
130    }
131}
132
133/// A type that guides the transmission of number of rows back to the coordinator.
134struct ResponseProtocol {
135    pub sink_id: GlobalId,
136    pub response_buffer: Option<Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>>,
137}
138
139impl ResponseProtocol {
140    // This method should only be called once otherwise this will panic.
141    fn send(&mut self, count: Result<u64, String>) {
142        // The dataflow's input has been exhausted, clear the channel,
143        // to avoid sending `CopyToResponse::Dropped`.
144        let buffer = self.response_buffer.take().expect("expect response buffer");
145        let response = match count {
146            Ok(count) => CopyToResponse::RowCount(count),
147            Err(error) => CopyToResponse::Error(error),
148        };
149        buffer.borrow_mut().push((self.sink_id, response));
150    }
151}
152
153impl Drop for ResponseProtocol {
154    fn drop(&mut self) {
155        if let Some(buffer) = self.response_buffer.take() {
156            buffer
157                .borrow_mut()
158                .push((self.sink_id, CopyToResponse::Dropped));
159        }
160    }
161}