1use std::any::Any;
13use std::collections::BTreeMap;
14use std::collections::btree_map::Entry;
15use std::rc::Rc;
16
17use anyhow::anyhow;
18use aws_types::sdk_config::SdkConfig;
19use differential_dataflow::Hashable;
20use futures::StreamExt;
21use mz_ore::cast::CastFrom;
22use mz_ore::error::ErrorExt;
23use mz_ore::future::InTask;
24use mz_repr::{CatalogItemId, Diff, GlobalId, Row, Timestamp};
25use mz_storage_types::connections::ConnectionContext;
26use mz_storage_types::connections::aws::AwsConnection;
27use mz_storage_types::errors::DataflowError;
28use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
29use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
30use mz_timely_util::builder_async::{
31 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
32};
33use mz_timely_util::columnation::ColumnationStack;
34use timely::PartialOrder;
35use timely::container::CapacityContainerBuilder;
36use timely::dataflow::channels::pact::{Exchange, Pipeline};
37use timely::dataflow::operators::vec::Broadcast;
38use timely::dataflow::{Scope, StreamVec};
39use timely::progress::Antichain;
40use tracing::debug;
41
42mod parquet;
43mod pgcopy;
44
45pub fn copy_to<'scope, F>(
62 input_collection: StreamVec<
63 'scope,
64 Timestamp,
65 Vec<ColumnationStack<((Row, ()), Timestamp, Diff)>>,
66 >,
67 err_stream: StreamVec<'scope, Timestamp, (DataflowError, Timestamp, Diff)>,
68 up_to: Antichain<Timestamp>,
69 connection_details: S3UploadInfo,
70 connection_context: ConnectionContext,
71 aws_connection: AwsConnection,
72 sink_id: GlobalId,
73 connection_id: CatalogItemId,
74 params: CopyToParameters,
75 worker_callback: F,
76 output_batch_count: u64,
77 enforce_external_addresses: bool,
78) -> Rc<dyn Any>
79where
80 F: FnOnce(Result<u64, String>) -> () + 'static,
81{
82 let scope = input_collection.scope();
83
84 let s3_key_manager = S3KeyManager::new(&sink_id, &connection_details.uri);
85
86 let (start_stream, start_token) =
87 render_initialization_operator(scope.clone(), sink_id, up_to.clone(), err_stream);
88
89 let (completion_stream, upload_token) = match connection_details.format {
90 S3SinkFormat::PgCopy(_) => render_upload_operator::<pgcopy::PgCopyUploader>(
91 scope.clone(),
92 connection_context.clone(),
93 aws_connection.clone(),
94 connection_id,
95 connection_details,
96 sink_id,
97 input_collection,
98 up_to,
99 start_stream,
100 params,
101 output_batch_count,
102 enforce_external_addresses,
103 ),
104 S3SinkFormat::Parquet => render_upload_operator::<parquet::ParquetUploader>(
105 scope.clone(),
106 connection_context.clone(),
107 aws_connection.clone(),
108 connection_id,
109 connection_details,
110 sink_id,
111 input_collection,
112 up_to,
113 start_stream,
114 params,
115 output_batch_count,
116 enforce_external_addresses,
117 ),
118 };
119
120 let completion_token = render_completion_operator(
121 scope,
122 connection_context,
123 aws_connection,
124 connection_id,
125 sink_id,
126 s3_key_manager,
127 completion_stream,
128 worker_callback,
129 enforce_external_addresses,
130 );
131
132 Rc::new(vec![start_token, upload_token, completion_token])
133}
134
135fn render_initialization_operator<'scope>(
144 scope: Scope<'scope, Timestamp>,
145 sink_id: GlobalId,
146 up_to: Antichain<Timestamp>,
147 err_stream: StreamVec<'scope, Timestamp, (DataflowError, Timestamp, Diff)>,
148) -> (
149 StreamVec<'scope, Timestamp, Result<(), String>>,
150 PressOnDropButton,
151) {
152 let worker_id = scope.index();
153 let num_workers = scope.peers();
154 let leader_id = usize::cast_from((sink_id, "initialization").hashed()) % num_workers;
155 let is_leader = worker_id == leader_id;
156
157 let mut builder =
158 AsyncOperatorBuilder::new("CopyToS3-initialization".to_string(), scope.clone());
159
160 let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
161
162 let mut error_handle = builder.new_input_for(
166 err_stream,
167 Exchange::new(move |_| u64::cast_from(leader_id)),
168 &start_handle,
169 );
170
171 let button = builder.build(move |caps| async move {
172 let [start_cap] = caps.try_into().unwrap();
173
174 while let Some(event) = error_handle.next().await {
175 match event {
176 AsyncEvent::Data(cap, data) => {
177 for (error, ts, _) in data {
178 if !up_to.less_equal(&ts) {
179 start_handle.give(&cap, Err(error.to_string()));
180 return;
181 }
182 }
183 }
184 AsyncEvent::Progress(frontier) => {
185 if PartialOrder::less_equal(&up_to, &frontier) {
186 break;
188 }
189 }
190 }
191 }
192
193 if !is_leader {
194 return;
195 }
196
197 start_handle.give(&start_cap, Ok(()));
198 });
199
200 (start_stream.broadcast(), button.press_on_drop())
203}
204
205fn render_completion_operator<'scope, F>(
217 scope: Scope<'scope, Timestamp>,
218 connection_context: ConnectionContext,
219 aws_connection: AwsConnection,
220 connection_id: CatalogItemId,
221 sink_id: GlobalId,
222 s3_key_manager: S3KeyManager,
223 completion_stream: StreamVec<'scope, Timestamp, Result<u64, String>>,
224 worker_callback: F,
225 enforce_external_addresses: bool,
226) -> PressOnDropButton
227where
228 F: FnOnce(Result<u64, String>) -> () + 'static,
229{
230 let worker_id = scope.index();
231 let num_workers = scope.peers();
232 let leader_id = usize::cast_from((sink_id, "completion").hashed()) % num_workers;
233 let is_leader = worker_id == leader_id;
234
235 let mut builder = AsyncOperatorBuilder::new("CopyToS3-completion".to_string(), scope.clone());
236
237 let mut completion_input = builder.new_disconnected_input(completion_stream, Pipeline);
238
239 let button = builder.build(move |_| async move {
240 let fallible_logic = async move {
242 let mut row_count = None;
243 while let Some(event) = completion_input.next().await {
244 if let AsyncEvent::Data(_ts, data) = event {
245 for result in data {
246 assert!(
247 row_count.is_none(),
248 "unexpectedly received more than 1 event on the completion stream!"
249 );
250 row_count = Some(result.map_err(|e| anyhow!(e))?);
251 }
252 }
253 }
254 let row_count = row_count.expect("did not receive completion event");
255
256 if is_leader {
257 debug!(%sink_id, %worker_id, "s3 leader worker completion");
258 let sdk_config = aws_connection
259 .load_sdk_config(
260 &connection_context,
261 connection_id,
262 InTask::Yes,
263 enforce_external_addresses,
264 )
265 .await?;
266
267 let client = mz_aws_util::s3::new_client(&sdk_config);
268 let bucket = s3_key_manager.bucket.clone();
269 let incomplete_sentinel_key = s3_key_manager.incomplete_sentinel_key();
270
271 mz_ore::task::spawn(|| "copytos3:completion", async move {
277 debug!(%sink_id, %worker_id, "removing INCOMPLETE sentinel file");
278 client
279 .delete_object()
280 .bucket(bucket)
281 .key(incomplete_sentinel_key)
282 .send()
283 .await?;
284 Ok::<(), anyhow::Error>(())
285 })
286 .await?;
287 }
288 Ok::<u64, anyhow::Error>(row_count)
289 };
290
291 worker_callback(fallible_logic.await.map_err(|e| e.to_string_with_causes()));
292 });
293
294 button.press_on_drop()
295}
296
297fn render_upload_operator<'scope, T>(
306 scope: Scope<'scope, Timestamp>,
307 connection_context: ConnectionContext,
308 aws_connection: AwsConnection,
309 connection_id: CatalogItemId,
310 connection_details: S3UploadInfo,
311 sink_id: GlobalId,
312 input_collection: StreamVec<
313 'scope,
314 Timestamp,
315 Vec<ColumnationStack<((Row, ()), Timestamp, Diff)>>,
316 >,
317 up_to: Antichain<Timestamp>,
318 start_stream: StreamVec<'scope, Timestamp, Result<(), String>>,
319 params: CopyToParameters,
320 output_batch_count: u64,
321 enforce_external_addresses: bool,
322) -> (
323 StreamVec<'scope, Timestamp, Result<u64, String>>,
324 PressOnDropButton,
325)
326where
327 T: CopyToS3Uploader,
328{
329 let worker_id = scope.index();
330 let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone());
331
332 let mut input_handle = builder.new_disconnected_input(input_collection, Pipeline);
333 let (completion_handle, completion_stream) =
334 builder.new_output::<CapacityContainerBuilder<_>>();
335 let mut start_handle = builder.new_input_for(start_stream, Pipeline, &completion_handle);
336
337 let button = builder.build(move |caps| async move {
338 let [completion_cap] = caps.try_into().unwrap();
339
340 while let Some(event) = start_handle.next().await {
344 match event {
345 AsyncEvent::Data(cap, data) => {
346 for res in data {
347 if res.is_err() {
348 completion_handle.give(&cap, res.map(|_| 0));
349 return;
350 }
351 }
352 }
353 AsyncEvent::Progress(_) => {}
354 }
355 }
356
357 let res = async move {
359 let sdk_config = aws_connection
360 .load_sdk_config(
361 &connection_context,
362 connection_id,
363 InTask::Yes,
364 enforce_external_addresses,
365 )
366 .await?;
367
368 let mut s3_uploaders: BTreeMap<u64, T> = BTreeMap::new();
370
371 if worker_id == 0 {
376 let mut uploader = T::new(
377 sdk_config.clone(),
378 connection_details.clone(),
379 &sink_id,
380 0,
381 params.clone(),
382 )?;
383 uploader.force_new_file().await?;
384 s3_uploaders.insert(0, uploader);
385 }
386
387 let mut row_count = 0;
388 while let Some(event) = input_handle.next().await {
389 match event {
390 AsyncEvent::Data(_ts, data) => {
391 for ((row, ()), ts, diff) in
392 data.iter().flatten().flat_map(|chunk| chunk.iter())
393 {
394 let batch = row.hashed() % output_batch_count;
397 if !up_to.less_equal(ts) {
398 if diff.is_negative() {
399 tracing::error!(
400 %sink_id, %diff, ?row,
401 "S3 oneshot sink encountered negative multiplicities",
402 );
403 anyhow::bail!(
404 "Invalid data in source, \
405 saw retractions ({}) for row that does not exist: {:?}",
406 -*diff,
407 row,
408 )
409 }
410 row_count += u64::try_from(diff.into_inner()).unwrap();
411 let uploader = match s3_uploaders.entry(batch) {
412 Entry::Occupied(entry) => entry.into_mut(),
413 Entry::Vacant(entry) => {
414 debug!(%sink_id, %worker_id, "handling batch: {}", batch);
415 entry.insert(T::new(
416 sdk_config.clone(),
417 connection_details.clone(),
418 &sink_id,
419 batch,
420 params.clone(),
421 )?)
422 }
423 };
424 for _ in 0..diff.into_inner() {
425 uploader.append_row(row).await?;
426 }
427 }
428 }
429 }
430 AsyncEvent::Progress(frontier) => {
431 if PartialOrder::less_equal(&up_to, &frontier) {
432 for uploader in s3_uploaders.values_mut() {
433 uploader.finish().await?;
434 }
435 return Ok(row_count);
437 }
438 }
439 }
440 }
441 Ok::<u64, anyhow::Error>(row_count)
442 }
443 .await;
444
445 completion_handle.give(&completion_cap, res.map_err(|e| e.to_string_with_causes()));
446 });
447
448 (completion_stream, button.press_on_drop())
449}
450
451#[derive(Clone, Debug)]
455pub struct CopyToParameters {
456 pub parquet_row_group_ratio: usize,
459 pub arrow_builder_buffer_ratio: usize,
462 pub s3_multipart_part_size_bytes: usize,
464}
465
466trait CopyToS3Uploader: Sized {
471 fn new(
472 sdk_config: SdkConfig,
473 connection_details: S3UploadInfo,
474 sink_id: &GlobalId,
475 batch: u64,
476 params: CopyToParameters,
477 ) -> Result<Self, anyhow::Error>;
478 async fn force_new_file(&mut self) -> Result<(), anyhow::Error>;
481 async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error>;
483 async fn finish(&mut self) -> Result<(), anyhow::Error>;
486}