1use std::any::Any;
11use std::cell::RefCell;
12use std::rc::Rc;
13
14use differential_dataflow::{Collection, Hashable};
15use mz_compute_client::protocol::response::CopyToResponse;
16use mz_compute_types::dyncfgs::{
17 COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO, COPY_TO_S3_MULTIPART_PART_SIZE_BYTES,
18 COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO,
19};
20use mz_compute_types::sinks::{ComputeSinkDesc, CopyToS3OneshotSinkConnection};
21use mz_repr::{Diff, GlobalId, Row, Timestamp};
22use mz_storage_types::controller::CollectionMetadata;
23use mz_storage_types::errors::DataflowError;
24use mz_timely_util::operator::consolidate_pact;
25use mz_timely_util::probe::{Handle, ProbeNotify};
26use timely::dataflow::Scope;
27use timely::dataflow::channels::pact::{Exchange, Pipeline};
28use timely::dataflow::operators::Operator;
29use timely::progress::Antichain;
30
31use crate::render::StartSignal;
32use crate::render::sinks::SinkRender;
33use crate::typedefs::KeyBatcher;
34
35impl<G> SinkRender<G> for CopyToS3OneshotSinkConnection
36where
37 G: Scope<Timestamp = Timestamp>,
38{
39 fn render_sink(
40 &self,
41 compute_state: &mut crate::compute_state::ComputeState,
42 sink: &ComputeSinkDesc<CollectionMetadata>,
43 sink_id: GlobalId,
44 _as_of: Antichain<Timestamp>,
45 _start_signal: StartSignal,
46 sinked_collection: Collection<G, Row, Diff>,
47 err_collection: Collection<G, DataflowError, Diff>,
48 _ct_times: Option<Collection<G, (), Diff>>,
49 output_probe: &Handle<Timestamp>,
50 ) -> Option<Rc<dyn Any>> {
51 let mut response_protocol = ResponseProtocol {
53 sink_id,
54 response_buffer: Some(Rc::clone(&compute_state.copy_to_response_buffer)),
55 };
56 let result_callback = move |count: Result<u64, String>| {
57 response_protocol.send(count);
58 };
59
60 let batch_count = self.output_batch_count;
66
67 let input = consolidate_pact::<KeyBatcher<_, _, _>, _, _>(
70 &sinked_collection.map(move |row| (row, ())).inner,
71 Exchange::new(move |((row, ()), _, _): &((Row, _), _, _)| row.hashed() % batch_count),
72 "Consolidated COPY TO S3 input",
73 )
74 .probe_notify_with(vec![output_probe.clone()]);
75
76 let error = consolidate_pact::<KeyBatcher<_, _, _>, _, _>(
78 &err_collection.map(move |err| (err, ())).inner,
79 Exchange::new(move |((err, _), _, _): &((DataflowError, _), _, _)| {
80 err.hashed() % batch_count
81 }),
82 "Consolidated COPY TO S3 errors",
83 );
84
85 let error_stream =
90 error.unary_frontier(Pipeline, "COPY TO S3 error filtering", |_cap, _info| {
91 let up_to = sink.up_to.clone();
92 let mut received_one = false;
93 move |input, output| {
94 while let Some((time, data)) = input.next() {
95 if !up_to.less_equal(time.time()) && !received_one {
96 received_one = true;
97 output.session(&time).give_iterator(
98 data.iter()
99 .flatten()
100 .flat_map(|chunk| chunk.iter().cloned())
101 .next()
102 .map(|((err, ()), time, diff)| (err, time, diff))
103 .into_iter(),
104 );
105 }
106 }
107 }
108 });
109
110 let params = mz_storage_operators::s3_oneshot_sink::CopyToParameters {
111 parquet_row_group_ratio: COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO
112 .get(&compute_state.worker_config),
113 arrow_builder_buffer_ratio: COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO
114 .get(&compute_state.worker_config),
115 s3_multipart_part_size_bytes: COPY_TO_S3_MULTIPART_PART_SIZE_BYTES
116 .get(&compute_state.worker_config),
117 };
118
119 let token = mz_storage_operators::s3_oneshot_sink::copy_to(
120 input,
121 error_stream,
122 sink.up_to.clone(),
123 self.upload_info.clone(),
124 compute_state.context.connection_context.clone(),
125 self.aws_connection.clone(),
126 sink_id,
127 self.connection_id,
128 params,
129 result_callback,
130 self.output_batch_count,
131 );
132
133 Some(token)
134 }
135}
136
137struct ResponseProtocol {
139 pub sink_id: GlobalId,
140 pub response_buffer: Option<Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>>,
141}
142
143impl ResponseProtocol {
144 fn send(&mut self, count: Result<u64, String>) {
146 let buffer = self.response_buffer.take().expect("expect response buffer");
149 let response = match count {
150 Ok(count) => CopyToResponse::RowCount(count),
151 Err(error) => CopyToResponse::Error(error),
152 };
153 buffer.borrow_mut().push((self.sink_id, response));
154 }
155}
156
157impl Drop for ResponseProtocol {
158 fn drop(&mut self) {
159 if let Some(buffer) = self.response_buffer.take() {
160 buffer
161 .borrow_mut()
162 .push((self.sink_id, CopyToResponse::Dropped));
163 }
164 }
165}