1use std::any::Any;
11use std::cell::RefCell;
12use std::rc::Rc;
13
14use differential_dataflow::{Hashable, VecCollection};
15use mz_compute_client::protocol::response::CopyToResponse;
16use mz_compute_types::dyncfgs::{
17 COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO, COPY_TO_S3_MULTIPART_PART_SIZE_BYTES,
18 COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO,
19};
20use mz_compute_types::sinks::{ComputeSinkDesc, CopyToS3OneshotSinkConnection};
21use mz_repr::{Diff, GlobalId, Row, Timestamp};
22use mz_storage_types::controller::CollectionMetadata;
23use mz_timely_util::operator::consolidate_pact;
24use mz_timely_util::probe::{Handle, ProbeNotify};
25use timely::dataflow::channels::pact::{Exchange, Pipeline};
26use timely::dataflow::operators::Operator;
27use timely::progress::Antichain;
28
29use crate::render::StartSignal;
30use crate::render::errors::DataflowErrorSer;
31use crate::render::sinks::SinkRender;
32use crate::typedefs::KeyBatcher;
33
34impl<'scope> SinkRender<'scope> for CopyToS3OneshotSinkConnection {
35 fn render_sink(
36 &self,
37 compute_state: &mut crate::compute_state::ComputeState,
38 sink: &ComputeSinkDesc<CollectionMetadata>,
39 sink_id: GlobalId,
40 _as_of: Antichain<Timestamp>,
41 _start_signal: StartSignal,
42 sinked_collection: VecCollection<'scope, Timestamp, Row, Diff>,
43 err_collection: VecCollection<'scope, Timestamp, DataflowErrorSer, Diff>,
44 output_probe: &Handle<Timestamp>,
45 ) -> Option<Rc<dyn Any>> {
46 let mut response_protocol = ResponseProtocol {
48 sink_id,
49 response_buffer: Some(Rc::clone(&compute_state.copy_to_response_buffer)),
50 };
51 let result_callback = move |count: Result<u64, String>| {
52 response_protocol.send(count);
53 };
54
55 let batch_count = self.output_batch_count;
61
62 let input = consolidate_pact::<KeyBatcher<_, _, _>, _>(
65 sinked_collection.map(move |row| (row, ())).inner,
66 Exchange::new(move |((row, ()), _, _): &((Row, _), _, _)| row.hashed() % batch_count),
67 "Consolidated COPY TO S3 input",
68 )
69 .probe_notify_with(vec![output_probe.clone()]);
70
71 let error = consolidate_pact::<KeyBatcher<_, _, _>, _>(
73 err_collection.map(move |err| (err, ())).inner,
74 Exchange::new(move |((err, _), _, _): &((DataflowErrorSer, _), _, _)| {
75 err.hashed() % batch_count
76 }),
77 "Consolidated COPY TO S3 errors",
78 );
79
80 let error_stream =
85 error.unary_frontier(Pipeline, "COPY TO S3 error filtering", |_cap, _info| {
86 let up_to = sink.up_to.clone();
87 let mut received_one = false;
88 move |(input, _), output| {
89 input.for_each_time(|time, data| {
90 if !up_to.less_equal(time.time()) && !received_one {
91 received_one = true;
92 output.session(&time).give_iterator(
93 data.flatten()
94 .flatten()
95 .flat_map(|chunk| chunk.iter().cloned())
96 .next()
97 .map(|((err, ()), time, diff)| (err, time, diff))
98 .into_iter(),
99 );
100 }
101 });
102 }
103 });
104
105 let params = mz_storage_operators::s3_oneshot_sink::CopyToParameters {
106 parquet_row_group_ratio: COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO
107 .get(&compute_state.worker_config),
108 arrow_builder_buffer_ratio: COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO
109 .get(&compute_state.worker_config),
110 s3_multipart_part_size_bytes: COPY_TO_S3_MULTIPART_PART_SIZE_BYTES
111 .get(&compute_state.worker_config),
112 };
113
114 use timely::dataflow::operators::vec::Map;
117 let error_stream = error_stream.map(|(err, time, diff)| (err.deserialize(), time, diff));
118
119 let enforce_external_addresses =
120 mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES.get(&compute_state.worker_config);
121
122 let token = mz_storage_operators::s3_oneshot_sink::copy_to(
123 input,
124 error_stream,
125 sink.up_to.clone(),
126 self.upload_info.clone(),
127 compute_state.context.connection_context.clone(),
128 self.aws_connection.clone(),
129 sink_id,
130 self.connection_id,
131 params,
132 result_callback,
133 self.output_batch_count,
134 enforce_external_addresses,
135 );
136
137 Some(token)
138 }
139}
140
141struct ResponseProtocol {
143 pub sink_id: GlobalId,
144 pub response_buffer: Option<Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>>,
145}
146
147impl ResponseProtocol {
148 fn send(&mut self, count: Result<u64, String>) {
150 let buffer = self.response_buffer.take().expect("expect response buffer");
153 let response = match count {
154 Ok(count) => CopyToResponse::RowCount(count),
155 Err(error) => CopyToResponse::Error(error),
156 };
157 buffer.borrow_mut().push((self.sink_id, response));
158 }
159}
160
161impl Drop for ResponseProtocol {
162 fn drop(&mut self) {
163 if let Some(buffer) = self.response_buffer.take() {
164 buffer
165 .borrow_mut()
166 .push((self.sink_id, CopyToResponse::Dropped));
167 }
168 }
169}