mz_compute/sink/
copy_to_s3_oneshot.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::any::Any;
11use std::cell::RefCell;
12use std::rc::Rc;
13
14use differential_dataflow::{Collection, Hashable};
15use mz_compute_client::protocol::response::CopyToResponse;
16use mz_compute_types::dyncfgs::{
17    COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO, COPY_TO_S3_MULTIPART_PART_SIZE_BYTES,
18    COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO,
19};
20use mz_compute_types::sinks::{ComputeSinkDesc, CopyToS3OneshotSinkConnection};
21use mz_repr::{Diff, GlobalId, Row, Timestamp};
22use mz_storage_types::controller::CollectionMetadata;
23use mz_storage_types::errors::DataflowError;
24use mz_timely_util::operator::consolidate_pact;
25use mz_timely_util::probe::{Handle, ProbeNotify};
26use timely::dataflow::Scope;
27use timely::dataflow::channels::pact::{Exchange, Pipeline};
28use timely::dataflow::operators::Operator;
29use timely::progress::Antichain;
30
31use crate::render::StartSignal;
32use crate::render::sinks::SinkRender;
33use crate::typedefs::KeyBatcher;
34
35impl<G> SinkRender<G> for CopyToS3OneshotSinkConnection
36where
37    G: Scope<Timestamp = Timestamp>,
38{
39    fn render_sink(
40        &self,
41        compute_state: &mut crate::compute_state::ComputeState,
42        sink: &ComputeSinkDesc<CollectionMetadata>,
43        sink_id: GlobalId,
44        _as_of: Antichain<Timestamp>,
45        _start_signal: StartSignal,
46        sinked_collection: Collection<G, Row, Diff>,
47        err_collection: Collection<G, DataflowError, Diff>,
48        _ct_times: Option<Collection<G, (), Diff>>,
49        output_probe: &Handle<Timestamp>,
50    ) -> Option<Rc<dyn Any>> {
51        // Set up a callback to communicate the result of the copy-to operation to the controller.
52        let mut response_protocol = ResponseProtocol {
53            sink_id,
54            response_buffer: Some(Rc::clone(&compute_state.copy_to_response_buffer)),
55        };
56        let result_callback = move |count: Result<u64, String>| {
57            response_protocol.send(count);
58        };
59
60        // Splitting the data across a known number of batches to distribute load across the cluster.
61        // Each worker will be handling data belonging to 0 or more batches. We are doing this so that
62        // we can write files to s3 deterministically across different replicas of different sizes
63        // using the batch ID. Each worker will split a batch's data into 1 or more
64        // files based on the user provided `MAX_FILE_SIZE`.
65        let batch_count = self.output_batch_count;
66
67        // We exchange the data according to batch, but we don't want to send the batch ID to the
68        // sink. The sink can re-compute the batch ID from the data.
69        let input = consolidate_pact::<KeyBatcher<_, _, _>, _, _>(
70            &sinked_collection.map(move |row| (row, ())).inner,
71            Exchange::new(move |((row, ()), _, _): &((Row, _), _, _)| row.hashed() % batch_count),
72            "Consolidated COPY TO S3 input",
73        )
74        .probe_notify_with(vec![output_probe.clone()]);
75
76        // We need to consolidate the error collection to ensure we don't act on retracted errors.
77        let error = consolidate_pact::<KeyBatcher<_, _, _>, _, _>(
78            &err_collection.map(move |err| (err, ())).inner,
79            Exchange::new(move |((err, _), _, _): &((DataflowError, _), _, _)| {
80                err.hashed() % batch_count
81            }),
82            "Consolidated COPY TO S3 errors",
83        );
84
85        // We can only propagate the one error back to the client, so filter the error
86        // collection to the first error that is before the sink 'up_to' to avoid
87        // sending the full error collection to the next operator. We ensure we find the
88        // first error before the 'up_to' to avoid accidentally sending an irrelevant error.
89        let error_stream =
90            error.unary_frontier(Pipeline, "COPY TO S3 error filtering", |_cap, _info| {
91                let up_to = sink.up_to.clone();
92                let mut received_one = false;
93                move |input, output| {
94                    while let Some((time, data)) = input.next() {
95                        if !up_to.less_equal(time.time()) && !received_one {
96                            received_one = true;
97                            output.session(&time).give_iterator(
98                                data.iter()
99                                    .flatten()
100                                    .flat_map(|chunk| chunk.iter().cloned())
101                                    .next()
102                                    .map(|((err, ()), time, diff)| (err, time, diff))
103                                    .into_iter(),
104                            );
105                        }
106                    }
107                }
108            });
109
110        let params = mz_storage_operators::s3_oneshot_sink::CopyToParameters {
111            parquet_row_group_ratio: COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO
112                .get(&compute_state.worker_config),
113            arrow_builder_buffer_ratio: COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO
114                .get(&compute_state.worker_config),
115            s3_multipart_part_size_bytes: COPY_TO_S3_MULTIPART_PART_SIZE_BYTES
116                .get(&compute_state.worker_config),
117        };
118
119        let token = mz_storage_operators::s3_oneshot_sink::copy_to(
120            input,
121            error_stream,
122            sink.up_to.clone(),
123            self.upload_info.clone(),
124            compute_state.context.connection_context.clone(),
125            self.aws_connection.clone(),
126            sink_id,
127            self.connection_id,
128            params,
129            result_callback,
130            self.output_batch_count,
131        );
132
133        Some(token)
134    }
135}
136
137/// A type that guides the transmission of number of rows back to the coordinator.
138struct ResponseProtocol {
139    pub sink_id: GlobalId,
140    pub response_buffer: Option<Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>>,
141}
142
143impl ResponseProtocol {
144    // This method should only be called once otherwise this will panic.
145    fn send(&mut self, count: Result<u64, String>) {
146        // The dataflow's input has been exhausted, clear the channel,
147        // to avoid sending `CopyToResponse::Dropped`.
148        let buffer = self.response_buffer.take().expect("expect response buffer");
149        let response = match count {
150            Ok(count) => CopyToResponse::RowCount(count),
151            Err(error) => CopyToResponse::Error(error),
152        };
153        buffer.borrow_mut().push((self.sink_id, response));
154    }
155}
156
157impl Drop for ResponseProtocol {
158    fn drop(&mut self) {
159        if let Some(buffer) = self.response_buffer.take() {
160            buffer
161                .borrow_mut()
162                .push((self.sink_id, CopyToResponse::Dropped));
163        }
164    }
165}