1use std::any::Any;
13use std::collections::BTreeMap;
14use std::collections::btree_map::Entry;
15use std::rc::Rc;
16
17use anyhow::anyhow;
18use aws_types::sdk_config::SdkConfig;
19use differential_dataflow::Hashable;
20use differential_dataflow::containers::TimelyStack;
21use futures::StreamExt;
22use mz_ore::cast::CastFrom;
23use mz_ore::error::ErrorExt;
24use mz_ore::future::InTask;
25use mz_ore::task::JoinHandleExt;
26use mz_repr::{CatalogItemId, Diff, GlobalId, Row, Timestamp};
27use mz_storage_types::connections::ConnectionContext;
28use mz_storage_types::connections::aws::AwsConnection;
29use mz_storage_types::errors::DataflowError;
30use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
31use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
32use mz_timely_util::builder_async::{
33 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
34};
35use timely::PartialOrder;
36use timely::dataflow::channels::pact::{Exchange, Pipeline};
37use timely::dataflow::operators::Broadcast;
38use timely::dataflow::{Scope, Stream};
39use timely::progress::Antichain;
40use tracing::debug;
41
42mod parquet;
43mod pgcopy;
44
45pub fn copy_to<G, F>(
62 input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
63 err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
64 up_to: Antichain<G::Timestamp>,
65 connection_details: S3UploadInfo,
66 connection_context: ConnectionContext,
67 aws_connection: AwsConnection,
68 sink_id: GlobalId,
69 connection_id: CatalogItemId,
70 params: CopyToParameters,
71 worker_callback: F,
72 output_batch_count: u64,
73) -> Rc<dyn Any>
74where
75 G: Scope<Timestamp = Timestamp>,
76 F: FnOnce(Result<u64, String>) -> () + 'static,
77{
78 let scope = input_collection.scope();
79
80 let s3_key_manager = S3KeyManager::new(&sink_id, &connection_details.uri);
81
82 let (start_stream, start_token) =
83 render_initialization_operator(scope.clone(), sink_id, up_to.clone(), err_stream);
84
85 let (completion_stream, upload_token) = match connection_details.format {
86 S3SinkFormat::PgCopy(_) => render_upload_operator::<G, pgcopy::PgCopyUploader>(
87 scope.clone(),
88 connection_context.clone(),
89 aws_connection.clone(),
90 connection_id,
91 connection_details,
92 sink_id,
93 input_collection,
94 up_to,
95 start_stream,
96 params,
97 output_batch_count,
98 ),
99 S3SinkFormat::Parquet => render_upload_operator::<G, parquet::ParquetUploader>(
100 scope.clone(),
101 connection_context.clone(),
102 aws_connection.clone(),
103 connection_id,
104 connection_details,
105 sink_id,
106 input_collection,
107 up_to,
108 start_stream,
109 params,
110 output_batch_count,
111 ),
112 };
113
114 let completion_token = render_completion_operator(
115 scope,
116 connection_context,
117 aws_connection,
118 connection_id,
119 sink_id,
120 s3_key_manager,
121 completion_stream,
122 worker_callback,
123 );
124
125 Rc::new(vec![start_token, upload_token, completion_token])
126}
127
128fn render_initialization_operator<G>(
137 scope: G,
138 sink_id: GlobalId,
139 up_to: Antichain<G::Timestamp>,
140 err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
141) -> (Stream<G, Result<(), String>>, PressOnDropButton)
142where
143 G: Scope<Timestamp = Timestamp>,
144{
145 let worker_id = scope.index();
146 let num_workers = scope.peers();
147 let leader_id = usize::cast_from((sink_id, "initialization").hashed()) % num_workers;
148 let is_leader = worker_id == leader_id;
149
150 let mut builder =
151 AsyncOperatorBuilder::new("CopyToS3-initialization".to_string(), scope.clone());
152
153 let (start_handle, start_stream) = builder.new_output();
154
155 let mut error_handle = builder.new_input_for(
159 &err_stream,
160 Exchange::new(move |_| u64::cast_from(leader_id)),
161 &start_handle,
162 );
163
164 let button = builder.build(move |caps| async move {
165 let [start_cap] = caps.try_into().unwrap();
166
167 while let Some(event) = error_handle.next().await {
168 match event {
169 AsyncEvent::Data(cap, data) => {
170 for (error, ts, _) in data {
171 if !up_to.less_equal(&ts) {
172 start_handle.give(&cap, Err(error.to_string()));
173 return;
174 }
175 }
176 }
177 AsyncEvent::Progress(frontier) => {
178 if PartialOrder::less_equal(&up_to, &frontier) {
179 break;
181 }
182 }
183 }
184 }
185
186 if !is_leader {
187 return;
188 }
189
190 start_handle.give(&start_cap, Ok(()));
191 });
192
193 (start_stream.broadcast(), button.press_on_drop())
196}
197
198fn render_completion_operator<G, F>(
210 scope: G,
211 connection_context: ConnectionContext,
212 aws_connection: AwsConnection,
213 connection_id: CatalogItemId,
214 sink_id: GlobalId,
215 s3_key_manager: S3KeyManager,
216 completion_stream: Stream<G, Result<u64, String>>,
217 worker_callback: F,
218) -> PressOnDropButton
219where
220 G: Scope<Timestamp = Timestamp>,
221 F: FnOnce(Result<u64, String>) -> () + 'static,
222{
223 let worker_id = scope.index();
224 let num_workers = scope.peers();
225 let leader_id = usize::cast_from((sink_id, "completion").hashed()) % num_workers;
226 let is_leader = worker_id == leader_id;
227
228 let mut builder = AsyncOperatorBuilder::new("CopyToS3-completion".to_string(), scope.clone());
229
230 let mut completion_input = builder.new_disconnected_input(&completion_stream, Pipeline);
231
232 let button = builder.build(move |_| async move {
233 let fallible_logic = async move {
235 let mut row_count = None;
236 while let Some(event) = completion_input.next().await {
237 if let AsyncEvent::Data(_ts, data) = event {
238 for result in data {
239 assert!(
240 row_count.is_none(),
241 "unexpectedly received more than 1 event on the completion stream!"
242 );
243 row_count = Some(result.map_err(|e| anyhow!(e))?);
244 }
245 }
246 }
247 let row_count = row_count.expect("did not receive completion event");
248
249 if is_leader {
250 debug!(%sink_id, %worker_id, "s3 leader worker completion");
251 let sdk_config = aws_connection
252 .load_sdk_config(&connection_context, connection_id, InTask::Yes)
253 .await?;
254
255 let client = mz_aws_util::s3::new_client(&sdk_config);
256 let bucket = s3_key_manager.bucket.clone();
257 let incomplete_sentinel_key = s3_key_manager.incomplete_sentinel_key();
258
259 mz_ore::task::spawn(|| "copytos3:completion", async move {
265 debug!(%sink_id, %worker_id, "removing INCOMPLETE sentinel file");
266 client
267 .delete_object()
268 .bucket(bucket)
269 .key(incomplete_sentinel_key)
270 .send()
271 .await?;
272 Ok::<(), anyhow::Error>(())
273 })
274 .wait_and_assert_finished()
275 .await?;
276 }
277 Ok::<u64, anyhow::Error>(row_count)
278 };
279
280 worker_callback(fallible_logic.await.map_err(|e| e.to_string_with_causes()));
281 });
282
283 button.press_on_drop()
284}
285
286fn render_upload_operator<G, T>(
295 scope: G,
296 connection_context: ConnectionContext,
297 aws_connection: AwsConnection,
298 connection_id: CatalogItemId,
299 connection_details: S3UploadInfo,
300 sink_id: GlobalId,
301 input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
302 up_to: Antichain<G::Timestamp>,
303 start_stream: Stream<G, Result<(), String>>,
304 params: CopyToParameters,
305 output_batch_count: u64,
306) -> (Stream<G, Result<u64, String>>, PressOnDropButton)
307where
308 G: Scope<Timestamp = Timestamp>,
309 T: CopyToS3Uploader,
310{
311 let worker_id = scope.index();
312 let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone());
313
314 let mut input_handle = builder.new_disconnected_input(&input_collection, Pipeline);
315 let (completion_handle, completion_stream) = builder.new_output();
316 let mut start_handle = builder.new_input_for(&start_stream, Pipeline, &completion_handle);
317
318 let button = builder.build(move |caps| async move {
319 let [completion_cap] = caps.try_into().unwrap();
320
321 while let Some(event) = start_handle.next().await {
325 match event {
326 AsyncEvent::Data(cap, data) => {
327 for res in data {
328 if res.is_err() {
329 completion_handle.give(&cap, res.map(|_| 0));
330 return;
331 }
332 }
333 }
334 AsyncEvent::Progress(_) => {}
335 }
336 }
337
338 let res = async move {
340 let sdk_config = aws_connection
341 .load_sdk_config(&connection_context, connection_id, InTask::Yes)
342 .await?;
343
344 let mut s3_uploaders: BTreeMap<u64, T> = BTreeMap::new();
346
347 if worker_id == 0 {
352 let mut uploader = T::new(
353 sdk_config.clone(),
354 connection_details.clone(),
355 &sink_id,
356 0,
357 params.clone(),
358 )?;
359 uploader.force_new_file().await?;
360 s3_uploaders.insert(0, uploader);
361 }
362
363 let mut row_count = 0;
364 while let Some(event) = input_handle.next().await {
365 match event {
366 AsyncEvent::Data(_ts, data) => {
367 for ((row, ()), ts, diff) in
368 data.iter().flatten().flat_map(|chunk| chunk.iter())
369 {
370 let batch = row.hashed() % output_batch_count;
373 if !up_to.less_equal(ts) {
374 if diff.is_negative() {
375 anyhow::bail!(
376 "Invalid data in source errors, saw retractions ({}) for \
377 row that does not exist",
378 -*diff,
379 )
380 }
381 row_count += u64::try_from(diff.into_inner()).unwrap();
382 let uploader = match s3_uploaders.entry(batch) {
383 Entry::Occupied(entry) => entry.into_mut(),
384 Entry::Vacant(entry) => {
385 debug!(%sink_id, %worker_id, "handling batch: {}", batch);
386 entry.insert(T::new(
387 sdk_config.clone(),
388 connection_details.clone(),
389 &sink_id,
390 batch,
391 params.clone(),
392 )?)
393 }
394 };
395 for _ in 0..diff.into_inner() {
396 uploader.append_row(row).await?;
397 }
398 }
399 }
400 }
401 AsyncEvent::Progress(frontier) => {
402 if PartialOrder::less_equal(&up_to, &frontier) {
403 for uploader in s3_uploaders.values_mut() {
404 uploader.finish().await?;
405 }
406 return Ok(row_count);
408 }
409 }
410 }
411 }
412 Ok::<u64, anyhow::Error>(row_count)
413 }
414 .await;
415
416 completion_handle.give(&completion_cap, res.map_err(|e| e.to_string_with_causes()));
417 });
418
419 (completion_stream, button.press_on_drop())
420}
421
422#[derive(Clone, Debug)]
426pub struct CopyToParameters {
427 pub parquet_row_group_ratio: usize,
430 pub arrow_builder_buffer_ratio: usize,
433 pub s3_multipart_part_size_bytes: usize,
435}
436
437trait CopyToS3Uploader: Sized {
442 fn new(
443 sdk_config: SdkConfig,
444 connection_details: S3UploadInfo,
445 sink_id: &GlobalId,
446 batch: u64,
447 params: CopyToParameters,
448 ) -> Result<Self, anyhow::Error>;
449 async fn force_new_file(&mut self) -> Result<(), anyhow::Error>;
452 async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error>;
454 async fn finish(&mut self) -> Result<(), anyhow::Error>;
457}