1use std::any::Any;
11use std::cell::RefCell;
12use std::rc::Rc;
13
14use differential_dataflow::{Hashable, VecCollection};
15use mz_compute_client::protocol::response::CopyToResponse;
16use mz_compute_types::dyncfgs::{
17 COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO, COPY_TO_S3_MULTIPART_PART_SIZE_BYTES,
18 COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO,
19};
20use mz_compute_types::sinks::{ComputeSinkDesc, CopyToS3OneshotSinkConnection};
21use mz_repr::{Diff, GlobalId, Row, Timestamp};
22use mz_storage_types::controller::CollectionMetadata;
23use mz_storage_types::errors::DataflowError;
24use mz_timely_util::operator::consolidate_pact;
25use mz_timely_util::probe::{Handle, ProbeNotify};
26use timely::dataflow::channels::pact::{Exchange, Pipeline};
27use timely::dataflow::operators::Operator;
28use timely::progress::Antichain;
29
30use crate::render::StartSignal;
31use crate::render::sinks::SinkRender;
32use crate::typedefs::KeyBatcher;
33
34impl<'scope> SinkRender<'scope> for CopyToS3OneshotSinkConnection {
35 fn render_sink(
36 &self,
37 compute_state: &mut crate::compute_state::ComputeState,
38 sink: &ComputeSinkDesc<CollectionMetadata>,
39 sink_id: GlobalId,
40 _as_of: Antichain<Timestamp>,
41 _start_signal: StartSignal,
42 sinked_collection: VecCollection<'scope, Timestamp, Row, Diff>,
43 err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
44 _ct_times: Option<VecCollection<'scope, Timestamp, (), Diff>>,
45 output_probe: &Handle<Timestamp>,
46 ) -> Option<Rc<dyn Any>> {
47 let mut response_protocol = ResponseProtocol {
49 sink_id,
50 response_buffer: Some(Rc::clone(&compute_state.copy_to_response_buffer)),
51 };
52 let result_callback = move |count: Result<u64, String>| {
53 response_protocol.send(count);
54 };
55
56 let batch_count = self.output_batch_count;
62
63 let input = consolidate_pact::<KeyBatcher<_, _, _>, _>(
66 sinked_collection.map(move |row| (row, ())).inner,
67 Exchange::new(move |((row, ()), _, _): &((Row, _), _, _)| row.hashed() % batch_count),
68 "Consolidated COPY TO S3 input",
69 )
70 .probe_notify_with(vec![output_probe.clone()]);
71
72 let error = consolidate_pact::<KeyBatcher<_, _, _>, _>(
74 err_collection.map(move |err| (err, ())).inner,
75 Exchange::new(move |((err, _), _, _): &((DataflowError, _), _, _)| {
76 err.hashed() % batch_count
77 }),
78 "Consolidated COPY TO S3 errors",
79 );
80
81 let error_stream =
86 error.unary_frontier(Pipeline, "COPY TO S3 error filtering", |_cap, _info| {
87 let up_to = sink.up_to.clone();
88 let mut received_one = false;
89 move |(input, _), output| {
90 input.for_each_time(|time, data| {
91 if !up_to.less_equal(time.time()) && !received_one {
92 received_one = true;
93 output.session(&time).give_iterator(
94 data.flatten()
95 .flatten()
96 .flat_map(|chunk| chunk.iter().cloned())
97 .next()
98 .map(|((err, ()), time, diff)| (err, time, diff))
99 .into_iter(),
100 );
101 }
102 });
103 }
104 });
105
106 let params = mz_storage_operators::s3_oneshot_sink::CopyToParameters {
107 parquet_row_group_ratio: COPY_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO
108 .get(&compute_state.worker_config),
109 arrow_builder_buffer_ratio: COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO
110 .get(&compute_state.worker_config),
111 s3_multipart_part_size_bytes: COPY_TO_S3_MULTIPART_PART_SIZE_BYTES
112 .get(&compute_state.worker_config),
113 };
114
115 let token = mz_storage_operators::s3_oneshot_sink::copy_to(
116 input,
117 error_stream,
118 sink.up_to.clone(),
119 self.upload_info.clone(),
120 compute_state.context.connection_context.clone(),
121 self.aws_connection.clone(),
122 sink_id,
123 self.connection_id,
124 params,
125 result_callback,
126 self.output_batch_count,
127 );
128
129 Some(token)
130 }
131}
132
133struct ResponseProtocol {
135 pub sink_id: GlobalId,
136 pub response_buffer: Option<Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>>,
137}
138
139impl ResponseProtocol {
140 fn send(&mut self, count: Result<u64, String>) {
142 let buffer = self.response_buffer.take().expect("expect response buffer");
145 let response = match count {
146 Ok(count) => CopyToResponse::RowCount(count),
147 Err(error) => CopyToResponse::Error(error),
148 };
149 buffer.borrow_mut().push((self.sink_id, response));
150 }
151}
152
153impl Drop for ResponseProtocol {
154 fn drop(&mut self) {
155 if let Some(buffer) = self.response_buffer.take() {
156 buffer
157 .borrow_mut()
158 .push((self.sink_id, CopyToResponse::Dropped));
159 }
160 }
161}