1use std::any::Any;
13use std::collections::BTreeMap;
14use std::collections::btree_map::Entry;
15use std::rc::Rc;
16
17use anyhow::anyhow;
18use aws_types::sdk_config::SdkConfig;
19use differential_dataflow::Hashable;
20use differential_dataflow::containers::TimelyStack;
21use futures::StreamExt;
22use mz_ore::cast::CastFrom;
23use mz_ore::error::ErrorExt;
24use mz_ore::future::InTask;
25use mz_ore::task::JoinHandleExt;
26use mz_repr::{CatalogItemId, Diff, GlobalId, Row, Timestamp};
27use mz_storage_types::connections::ConnectionContext;
28use mz_storage_types::connections::aws::AwsConnection;
29use mz_storage_types::errors::DataflowError;
30use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
31use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
32use mz_timely_util::builder_async::{
33 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
34};
35use timely::PartialOrder;
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::channels::pact::{Exchange, Pipeline};
38use timely::dataflow::operators::Broadcast;
39use timely::dataflow::{Scope, Stream};
40use timely::progress::Antichain;
41use tracing::debug;
42
43mod parquet;
44mod pgcopy;
45
46pub fn copy_to<G, F>(
63 input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
64 err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
65 up_to: Antichain<G::Timestamp>,
66 connection_details: S3UploadInfo,
67 connection_context: ConnectionContext,
68 aws_connection: AwsConnection,
69 sink_id: GlobalId,
70 connection_id: CatalogItemId,
71 params: CopyToParameters,
72 worker_callback: F,
73 output_batch_count: u64,
74) -> Rc<dyn Any>
75where
76 G: Scope<Timestamp = Timestamp>,
77 F: FnOnce(Result<u64, String>) -> () + 'static,
78{
79 let scope = input_collection.scope();
80
81 let s3_key_manager = S3KeyManager::new(&sink_id, &connection_details.uri);
82
83 let (start_stream, start_token) =
84 render_initialization_operator(scope.clone(), sink_id, up_to.clone(), err_stream);
85
86 let (completion_stream, upload_token) = match connection_details.format {
87 S3SinkFormat::PgCopy(_) => render_upload_operator::<G, pgcopy::PgCopyUploader>(
88 scope.clone(),
89 connection_context.clone(),
90 aws_connection.clone(),
91 connection_id,
92 connection_details,
93 sink_id,
94 input_collection,
95 up_to,
96 start_stream,
97 params,
98 output_batch_count,
99 ),
100 S3SinkFormat::Parquet => render_upload_operator::<G, parquet::ParquetUploader>(
101 scope.clone(),
102 connection_context.clone(),
103 aws_connection.clone(),
104 connection_id,
105 connection_details,
106 sink_id,
107 input_collection,
108 up_to,
109 start_stream,
110 params,
111 output_batch_count,
112 ),
113 };
114
115 let completion_token = render_completion_operator(
116 scope,
117 connection_context,
118 aws_connection,
119 connection_id,
120 sink_id,
121 s3_key_manager,
122 completion_stream,
123 worker_callback,
124 );
125
126 Rc::new(vec![start_token, upload_token, completion_token])
127}
128
129fn render_initialization_operator<G>(
138 scope: G,
139 sink_id: GlobalId,
140 up_to: Antichain<G::Timestamp>,
141 err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
142) -> (Stream<G, Result<(), String>>, PressOnDropButton)
143where
144 G: Scope<Timestamp = Timestamp>,
145{
146 let worker_id = scope.index();
147 let num_workers = scope.peers();
148 let leader_id = usize::cast_from((sink_id, "initialization").hashed()) % num_workers;
149 let is_leader = worker_id == leader_id;
150
151 let mut builder =
152 AsyncOperatorBuilder::new("CopyToS3-initialization".to_string(), scope.clone());
153
154 let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
155
156 let mut error_handle = builder.new_input_for(
160 &err_stream,
161 Exchange::new(move |_| u64::cast_from(leader_id)),
162 &start_handle,
163 );
164
165 let button = builder.build(move |caps| async move {
166 let [start_cap] = caps.try_into().unwrap();
167
168 while let Some(event) = error_handle.next().await {
169 match event {
170 AsyncEvent::Data(cap, data) => {
171 for (error, ts, _) in data {
172 if !up_to.less_equal(&ts) {
173 start_handle.give(&cap, Err(error.to_string()));
174 return;
175 }
176 }
177 }
178 AsyncEvent::Progress(frontier) => {
179 if PartialOrder::less_equal(&up_to, &frontier) {
180 break;
182 }
183 }
184 }
185 }
186
187 if !is_leader {
188 return;
189 }
190
191 start_handle.give(&start_cap, Ok(()));
192 });
193
194 (start_stream.broadcast(), button.press_on_drop())
197}
198
199fn render_completion_operator<G, F>(
211 scope: G,
212 connection_context: ConnectionContext,
213 aws_connection: AwsConnection,
214 connection_id: CatalogItemId,
215 sink_id: GlobalId,
216 s3_key_manager: S3KeyManager,
217 completion_stream: Stream<G, Result<u64, String>>,
218 worker_callback: F,
219) -> PressOnDropButton
220where
221 G: Scope<Timestamp = Timestamp>,
222 F: FnOnce(Result<u64, String>) -> () + 'static,
223{
224 let worker_id = scope.index();
225 let num_workers = scope.peers();
226 let leader_id = usize::cast_from((sink_id, "completion").hashed()) % num_workers;
227 let is_leader = worker_id == leader_id;
228
229 let mut builder = AsyncOperatorBuilder::new("CopyToS3-completion".to_string(), scope.clone());
230
231 let mut completion_input = builder.new_disconnected_input(&completion_stream, Pipeline);
232
233 let button = builder.build(move |_| async move {
234 let fallible_logic = async move {
236 let mut row_count = None;
237 while let Some(event) = completion_input.next().await {
238 if let AsyncEvent::Data(_ts, data) = event {
239 for result in data {
240 assert!(
241 row_count.is_none(),
242 "unexpectedly received more than 1 event on the completion stream!"
243 );
244 row_count = Some(result.map_err(|e| anyhow!(e))?);
245 }
246 }
247 }
248 let row_count = row_count.expect("did not receive completion event");
249
250 if is_leader {
251 debug!(%sink_id, %worker_id, "s3 leader worker completion");
252 let sdk_config = aws_connection
253 .load_sdk_config(&connection_context, connection_id, InTask::Yes)
254 .await?;
255
256 let client = mz_aws_util::s3::new_client(&sdk_config);
257 let bucket = s3_key_manager.bucket.clone();
258 let incomplete_sentinel_key = s3_key_manager.incomplete_sentinel_key();
259
260 mz_ore::task::spawn(|| "copytos3:completion", async move {
266 debug!(%sink_id, %worker_id, "removing INCOMPLETE sentinel file");
267 client
268 .delete_object()
269 .bucket(bucket)
270 .key(incomplete_sentinel_key)
271 .send()
272 .await?;
273 Ok::<(), anyhow::Error>(())
274 })
275 .wait_and_assert_finished()
276 .await?;
277 }
278 Ok::<u64, anyhow::Error>(row_count)
279 };
280
281 worker_callback(fallible_logic.await.map_err(|e| e.to_string_with_causes()));
282 });
283
284 button.press_on_drop()
285}
286
287fn render_upload_operator<G, T>(
296 scope: G,
297 connection_context: ConnectionContext,
298 aws_connection: AwsConnection,
299 connection_id: CatalogItemId,
300 connection_details: S3UploadInfo,
301 sink_id: GlobalId,
302 input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
303 up_to: Antichain<G::Timestamp>,
304 start_stream: Stream<G, Result<(), String>>,
305 params: CopyToParameters,
306 output_batch_count: u64,
307) -> (Stream<G, Result<u64, String>>, PressOnDropButton)
308where
309 G: Scope<Timestamp = Timestamp>,
310 T: CopyToS3Uploader,
311{
312 let worker_id = scope.index();
313 let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone());
314
315 let mut input_handle = builder.new_disconnected_input(&input_collection, Pipeline);
316 let (completion_handle, completion_stream) =
317 builder.new_output::<CapacityContainerBuilder<_>>();
318 let mut start_handle = builder.new_input_for(&start_stream, Pipeline, &completion_handle);
319
320 let button = builder.build(move |caps| async move {
321 let [completion_cap] = caps.try_into().unwrap();
322
323 while let Some(event) = start_handle.next().await {
327 match event {
328 AsyncEvent::Data(cap, data) => {
329 for res in data {
330 if res.is_err() {
331 completion_handle.give(&cap, res.map(|_| 0));
332 return;
333 }
334 }
335 }
336 AsyncEvent::Progress(_) => {}
337 }
338 }
339
340 let res = async move {
342 let sdk_config = aws_connection
343 .load_sdk_config(&connection_context, connection_id, InTask::Yes)
344 .await?;
345
346 let mut s3_uploaders: BTreeMap<u64, T> = BTreeMap::new();
348
349 if worker_id == 0 {
354 let mut uploader = T::new(
355 sdk_config.clone(),
356 connection_details.clone(),
357 &sink_id,
358 0,
359 params.clone(),
360 )?;
361 uploader.force_new_file().await?;
362 s3_uploaders.insert(0, uploader);
363 }
364
365 let mut row_count = 0;
366 while let Some(event) = input_handle.next().await {
367 match event {
368 AsyncEvent::Data(_ts, data) => {
369 for ((row, ()), ts, diff) in
370 data.iter().flatten().flat_map(|chunk| chunk.iter())
371 {
372 let batch = row.hashed() % output_batch_count;
375 if !up_to.less_equal(ts) {
376 if diff.is_negative() {
377 tracing::error!(
378 %sink_id, %diff, ?row,
379 "S3 oneshot sink encountered negative multiplicities",
380 );
381 anyhow::bail!(
382 "Invalid data in source, \
383 saw retractions ({}) for row that does not exist: {:?}",
384 -*diff,
385 row,
386 )
387 }
388 row_count += u64::try_from(diff.into_inner()).unwrap();
389 let uploader = match s3_uploaders.entry(batch) {
390 Entry::Occupied(entry) => entry.into_mut(),
391 Entry::Vacant(entry) => {
392 debug!(%sink_id, %worker_id, "handling batch: {}", batch);
393 entry.insert(T::new(
394 sdk_config.clone(),
395 connection_details.clone(),
396 &sink_id,
397 batch,
398 params.clone(),
399 )?)
400 }
401 };
402 for _ in 0..diff.into_inner() {
403 uploader.append_row(row).await?;
404 }
405 }
406 }
407 }
408 AsyncEvent::Progress(frontier) => {
409 if PartialOrder::less_equal(&up_to, &frontier) {
410 for uploader in s3_uploaders.values_mut() {
411 uploader.finish().await?;
412 }
413 return Ok(row_count);
415 }
416 }
417 }
418 }
419 Ok::<u64, anyhow::Error>(row_count)
420 }
421 .await;
422
423 completion_handle.give(&completion_cap, res.map_err(|e| e.to_string_with_causes()));
424 });
425
426 (completion_stream, button.press_on_drop())
427}
428
429#[derive(Clone, Debug)]
433pub struct CopyToParameters {
434 pub parquet_row_group_ratio: usize,
437 pub arrow_builder_buffer_ratio: usize,
440 pub s3_multipart_part_size_bytes: usize,
442}
443
444trait CopyToS3Uploader: Sized {
449 fn new(
450 sdk_config: SdkConfig,
451 connection_details: S3UploadInfo,
452 sink_id: &GlobalId,
453 batch: u64,
454 params: CopyToParameters,
455 ) -> Result<Self, anyhow::Error>;
456 async fn force_new_file(&mut self) -> Result<(), anyhow::Error>;
459 async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error>;
461 async fn finish(&mut self) -> Result<(), anyhow::Error>;
464}