1use std::any::Any;
13use std::collections::BTreeMap;
14use std::collections::btree_map::Entry;
15use std::rc::Rc;
16
17use anyhow::anyhow;
18use aws_types::sdk_config::SdkConfig;
19use differential_dataflow::Hashable;
20use futures::StreamExt;
21use mz_ore::cast::CastFrom;
22use mz_ore::error::ErrorExt;
23use mz_ore::future::InTask;
24use mz_repr::{CatalogItemId, Diff, GlobalId, Row, Timestamp};
25use mz_storage_types::connections::ConnectionContext;
26use mz_storage_types::connections::aws::AwsConnection;
27use mz_storage_types::errors::DataflowError;
28use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
29use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
30use mz_timely_util::builder_async::{
31 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
32};
33use mz_timely_util::columnation::ColumnationStack;
34use timely::PartialOrder;
35use timely::container::CapacityContainerBuilder;
36use timely::dataflow::channels::pact::{Exchange, Pipeline};
37use timely::dataflow::operators::vec::Broadcast;
38use timely::dataflow::{Scope, StreamVec};
39use timely::progress::Antichain;
40use tracing::debug;
41
42mod parquet;
43mod pgcopy;
44
45pub fn copy_to<'scope, F>(
62 input_collection: StreamVec<
63 'scope,
64 Timestamp,
65 Vec<ColumnationStack<((Row, ()), Timestamp, Diff)>>,
66 >,
67 err_stream: StreamVec<'scope, Timestamp, (DataflowError, Timestamp, Diff)>,
68 up_to: Antichain<Timestamp>,
69 connection_details: S3UploadInfo,
70 connection_context: ConnectionContext,
71 aws_connection: AwsConnection,
72 sink_id: GlobalId,
73 connection_id: CatalogItemId,
74 params: CopyToParameters,
75 worker_callback: F,
76 output_batch_count: u64,
77) -> Rc<dyn Any>
78where
79 F: FnOnce(Result<u64, String>) -> () + 'static,
80{
81 let scope = input_collection.scope();
82
83 let s3_key_manager = S3KeyManager::new(&sink_id, &connection_details.uri);
84
85 let (start_stream, start_token) =
86 render_initialization_operator(scope.clone(), sink_id, up_to.clone(), err_stream);
87
88 let (completion_stream, upload_token) = match connection_details.format {
89 S3SinkFormat::PgCopy(_) => render_upload_operator::<pgcopy::PgCopyUploader>(
90 scope.clone(),
91 connection_context.clone(),
92 aws_connection.clone(),
93 connection_id,
94 connection_details,
95 sink_id,
96 input_collection,
97 up_to,
98 start_stream,
99 params,
100 output_batch_count,
101 ),
102 S3SinkFormat::Parquet => render_upload_operator::<parquet::ParquetUploader>(
103 scope.clone(),
104 connection_context.clone(),
105 aws_connection.clone(),
106 connection_id,
107 connection_details,
108 sink_id,
109 input_collection,
110 up_to,
111 start_stream,
112 params,
113 output_batch_count,
114 ),
115 };
116
117 let completion_token = render_completion_operator(
118 scope,
119 connection_context,
120 aws_connection,
121 connection_id,
122 sink_id,
123 s3_key_manager,
124 completion_stream,
125 worker_callback,
126 );
127
128 Rc::new(vec![start_token, upload_token, completion_token])
129}
130
131fn render_initialization_operator<'scope>(
140 scope: Scope<'scope, Timestamp>,
141 sink_id: GlobalId,
142 up_to: Antichain<Timestamp>,
143 err_stream: StreamVec<'scope, Timestamp, (DataflowError, Timestamp, Diff)>,
144) -> (
145 StreamVec<'scope, Timestamp, Result<(), String>>,
146 PressOnDropButton,
147) {
148 let worker_id = scope.index();
149 let num_workers = scope.peers();
150 let leader_id = usize::cast_from((sink_id, "initialization").hashed()) % num_workers;
151 let is_leader = worker_id == leader_id;
152
153 let mut builder =
154 AsyncOperatorBuilder::new("CopyToS3-initialization".to_string(), scope.clone());
155
156 let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
157
158 let mut error_handle = builder.new_input_for(
162 err_stream,
163 Exchange::new(move |_| u64::cast_from(leader_id)),
164 &start_handle,
165 );
166
167 let button = builder.build(move |caps| async move {
168 let [start_cap] = caps.try_into().unwrap();
169
170 while let Some(event) = error_handle.next().await {
171 match event {
172 AsyncEvent::Data(cap, data) => {
173 for (error, ts, _) in data {
174 if !up_to.less_equal(&ts) {
175 start_handle.give(&cap, Err(error.to_string()));
176 return;
177 }
178 }
179 }
180 AsyncEvent::Progress(frontier) => {
181 if PartialOrder::less_equal(&up_to, &frontier) {
182 break;
184 }
185 }
186 }
187 }
188
189 if !is_leader {
190 return;
191 }
192
193 start_handle.give(&start_cap, Ok(()));
194 });
195
196 (start_stream.broadcast(), button.press_on_drop())
199}
200
201fn render_completion_operator<'scope, F>(
213 scope: Scope<'scope, Timestamp>,
214 connection_context: ConnectionContext,
215 aws_connection: AwsConnection,
216 connection_id: CatalogItemId,
217 sink_id: GlobalId,
218 s3_key_manager: S3KeyManager,
219 completion_stream: StreamVec<'scope, Timestamp, Result<u64, String>>,
220 worker_callback: F,
221) -> PressOnDropButton
222where
223 F: FnOnce(Result<u64, String>) -> () + 'static,
224{
225 let worker_id = scope.index();
226 let num_workers = scope.peers();
227 let leader_id = usize::cast_from((sink_id, "completion").hashed()) % num_workers;
228 let is_leader = worker_id == leader_id;
229
230 let mut builder = AsyncOperatorBuilder::new("CopyToS3-completion".to_string(), scope.clone());
231
232 let mut completion_input = builder.new_disconnected_input(completion_stream, Pipeline);
233
234 let button = builder.build(move |_| async move {
235 let fallible_logic = async move {
237 let mut row_count = None;
238 while let Some(event) = completion_input.next().await {
239 if let AsyncEvent::Data(_ts, data) = event {
240 for result in data {
241 assert!(
242 row_count.is_none(),
243 "unexpectedly received more than 1 event on the completion stream!"
244 );
245 row_count = Some(result.map_err(|e| anyhow!(e))?);
246 }
247 }
248 }
249 let row_count = row_count.expect("did not receive completion event");
250
251 if is_leader {
252 debug!(%sink_id, %worker_id, "s3 leader worker completion");
253 let sdk_config = aws_connection
254 .load_sdk_config(&connection_context, connection_id, InTask::Yes)
255 .await?;
256
257 let client = mz_aws_util::s3::new_client(&sdk_config);
258 let bucket = s3_key_manager.bucket.clone();
259 let incomplete_sentinel_key = s3_key_manager.incomplete_sentinel_key();
260
261 mz_ore::task::spawn(|| "copytos3:completion", async move {
267 debug!(%sink_id, %worker_id, "removing INCOMPLETE sentinel file");
268 client
269 .delete_object()
270 .bucket(bucket)
271 .key(incomplete_sentinel_key)
272 .send()
273 .await?;
274 Ok::<(), anyhow::Error>(())
275 })
276 .await?;
277 }
278 Ok::<u64, anyhow::Error>(row_count)
279 };
280
281 worker_callback(fallible_logic.await.map_err(|e| e.to_string_with_causes()));
282 });
283
284 button.press_on_drop()
285}
286
287fn render_upload_operator<'scope, T>(
296 scope: Scope<'scope, Timestamp>,
297 connection_context: ConnectionContext,
298 aws_connection: AwsConnection,
299 connection_id: CatalogItemId,
300 connection_details: S3UploadInfo,
301 sink_id: GlobalId,
302 input_collection: StreamVec<
303 'scope,
304 Timestamp,
305 Vec<ColumnationStack<((Row, ()), Timestamp, Diff)>>,
306 >,
307 up_to: Antichain<Timestamp>,
308 start_stream: StreamVec<'scope, Timestamp, Result<(), String>>,
309 params: CopyToParameters,
310 output_batch_count: u64,
311) -> (
312 StreamVec<'scope, Timestamp, Result<u64, String>>,
313 PressOnDropButton,
314)
315where
316 T: CopyToS3Uploader,
317{
318 let worker_id = scope.index();
319 let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone());
320
321 let mut input_handle = builder.new_disconnected_input(input_collection, Pipeline);
322 let (completion_handle, completion_stream) =
323 builder.new_output::<CapacityContainerBuilder<_>>();
324 let mut start_handle = builder.new_input_for(start_stream, Pipeline, &completion_handle);
325
326 let button = builder.build(move |caps| async move {
327 let [completion_cap] = caps.try_into().unwrap();
328
329 while let Some(event) = start_handle.next().await {
333 match event {
334 AsyncEvent::Data(cap, data) => {
335 for res in data {
336 if res.is_err() {
337 completion_handle.give(&cap, res.map(|_| 0));
338 return;
339 }
340 }
341 }
342 AsyncEvent::Progress(_) => {}
343 }
344 }
345
346 let res = async move {
348 let sdk_config = aws_connection
349 .load_sdk_config(&connection_context, connection_id, InTask::Yes)
350 .await?;
351
352 let mut s3_uploaders: BTreeMap<u64, T> = BTreeMap::new();
354
355 if worker_id == 0 {
360 let mut uploader = T::new(
361 sdk_config.clone(),
362 connection_details.clone(),
363 &sink_id,
364 0,
365 params.clone(),
366 )?;
367 uploader.force_new_file().await?;
368 s3_uploaders.insert(0, uploader);
369 }
370
371 let mut row_count = 0;
372 while let Some(event) = input_handle.next().await {
373 match event {
374 AsyncEvent::Data(_ts, data) => {
375 for ((row, ()), ts, diff) in
376 data.iter().flatten().flat_map(|chunk| chunk.iter())
377 {
378 let batch = row.hashed() % output_batch_count;
381 if !up_to.less_equal(ts) {
382 if diff.is_negative() {
383 tracing::error!(
384 %sink_id, %diff, ?row,
385 "S3 oneshot sink encountered negative multiplicities",
386 );
387 anyhow::bail!(
388 "Invalid data in source, \
389 saw retractions ({}) for row that does not exist: {:?}",
390 -*diff,
391 row,
392 )
393 }
394 row_count += u64::try_from(diff.into_inner()).unwrap();
395 let uploader = match s3_uploaders.entry(batch) {
396 Entry::Occupied(entry) => entry.into_mut(),
397 Entry::Vacant(entry) => {
398 debug!(%sink_id, %worker_id, "handling batch: {}", batch);
399 entry.insert(T::new(
400 sdk_config.clone(),
401 connection_details.clone(),
402 &sink_id,
403 batch,
404 params.clone(),
405 )?)
406 }
407 };
408 for _ in 0..diff.into_inner() {
409 uploader.append_row(row).await?;
410 }
411 }
412 }
413 }
414 AsyncEvent::Progress(frontier) => {
415 if PartialOrder::less_equal(&up_to, &frontier) {
416 for uploader in s3_uploaders.values_mut() {
417 uploader.finish().await?;
418 }
419 return Ok(row_count);
421 }
422 }
423 }
424 }
425 Ok::<u64, anyhow::Error>(row_count)
426 }
427 .await;
428
429 completion_handle.give(&completion_cap, res.map_err(|e| e.to_string_with_causes()));
430 });
431
432 (completion_stream, button.press_on_drop())
433}
434
435#[derive(Clone, Debug)]
439pub struct CopyToParameters {
440 pub parquet_row_group_ratio: usize,
443 pub arrow_builder_buffer_ratio: usize,
446 pub s3_multipart_part_size_bytes: usize,
448}
449
450trait CopyToS3Uploader: Sized {
455 fn new(
456 sdk_config: SdkConfig,
457 connection_details: S3UploadInfo,
458 sink_id: &GlobalId,
459 batch: u64,
460 params: CopyToParameters,
461 ) -> Result<Self, anyhow::Error>;
462 async fn force_new_file(&mut self) -> Result<(), anyhow::Error>;
465 async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error>;
467 async fn finish(&mut self) -> Result<(), anyhow::Error>;
470}