mz_storage_operators/
s3_oneshot_sink.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Uploads a consolidated collection to S3
11
12use std::any::Any;
13use std::collections::BTreeMap;
14use std::collections::btree_map::Entry;
15use std::rc::Rc;
16
17use anyhow::anyhow;
18use aws_types::sdk_config::SdkConfig;
19use differential_dataflow::Hashable;
20use differential_dataflow::containers::TimelyStack;
21use futures::StreamExt;
22use mz_ore::cast::CastFrom;
23use mz_ore::error::ErrorExt;
24use mz_ore::future::InTask;
25use mz_ore::task::JoinHandleExt;
26use mz_repr::{CatalogItemId, Diff, GlobalId, Row, Timestamp};
27use mz_storage_types::connections::ConnectionContext;
28use mz_storage_types::connections::aws::AwsConnection;
29use mz_storage_types::errors::DataflowError;
30use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
31use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
32use mz_timely_util::builder_async::{
33    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
34};
35use timely::PartialOrder;
36use timely::dataflow::channels::pact::{Exchange, Pipeline};
37use timely::dataflow::operators::Broadcast;
38use timely::dataflow::{Scope, Stream};
39use timely::progress::Antichain;
40use tracing::debug;
41
42mod parquet;
43mod pgcopy;
44
45/// Copy the rows from the input collection to s3.
46/// `worker_callback` is used to send the final count of rows uploaded to s3,
47/// or an error message if the operator failed. This is per-worker, and
48/// these responses are aggregated upstream by the compute client.
49/// `sink_id` is used to identify the sink for logging purposes and as a
50/// unique prefix for files created by the sink.
51///
52/// This renders 3 operators used to coordinate the upload:
53///   - initialization: confirms the S3 path is empty and writes any sentinel files
54///   - upload: uploads data to S3
55///   - completion: removes the sentinel file and calls the `worker_callback`
56///
57/// Returns a token that should be held to keep the sink alive.
58///
59/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
60/// modulo the number of batches.
61pub fn copy_to<G, F>(
62    input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
63    err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
64    up_to: Antichain<G::Timestamp>,
65    connection_details: S3UploadInfo,
66    connection_context: ConnectionContext,
67    aws_connection: AwsConnection,
68    sink_id: GlobalId,
69    connection_id: CatalogItemId,
70    params: CopyToParameters,
71    worker_callback: F,
72    output_batch_count: u64,
73) -> Rc<dyn Any>
74where
75    G: Scope<Timestamp = Timestamp>,
76    F: FnOnce(Result<u64, String>) -> () + 'static,
77{
78    let scope = input_collection.scope();
79
80    let s3_key_manager = S3KeyManager::new(&sink_id, &connection_details.uri);
81
82    let (start_stream, start_token) =
83        render_initialization_operator(scope.clone(), sink_id, up_to.clone(), err_stream);
84
85    let (completion_stream, upload_token) = match connection_details.format {
86        S3SinkFormat::PgCopy(_) => render_upload_operator::<G, pgcopy::PgCopyUploader>(
87            scope.clone(),
88            connection_context.clone(),
89            aws_connection.clone(),
90            connection_id,
91            connection_details,
92            sink_id,
93            input_collection,
94            up_to,
95            start_stream,
96            params,
97            output_batch_count,
98        ),
99        S3SinkFormat::Parquet => render_upload_operator::<G, parquet::ParquetUploader>(
100            scope.clone(),
101            connection_context.clone(),
102            aws_connection.clone(),
103            connection_id,
104            connection_details,
105            sink_id,
106            input_collection,
107            up_to,
108            start_stream,
109            params,
110            output_batch_count,
111        ),
112    };
113
114    let completion_token = render_completion_operator(
115        scope,
116        connection_context,
117        aws_connection,
118        connection_id,
119        sink_id,
120        s3_key_manager,
121        completion_stream,
122        worker_callback,
123    );
124
125    Rc::new(vec![start_token, upload_token, completion_token])
126}
127
128/// Renders the 'initialization' operator, which does work on the leader worker only.
129///
130/// The leader worker receives all errors from the `err_stream` and if it
131/// encounters any errors will early exit and broadcast the error on the
132/// returned `start_stream`, to avoid any unnecessary work across all workers.
133///
134/// Returns the `start_stream` with an error received in the `err_stream`, if
135/// any, otherwise `Ok(())`.
136fn render_initialization_operator<G>(
137    scope: G,
138    sink_id: GlobalId,
139    up_to: Antichain<G::Timestamp>,
140    err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
141) -> (Stream<G, Result<(), String>>, PressOnDropButton)
142where
143    G: Scope<Timestamp = Timestamp>,
144{
145    let worker_id = scope.index();
146    let num_workers = scope.peers();
147    let leader_id = usize::cast_from((sink_id, "initialization").hashed()) % num_workers;
148    let is_leader = worker_id == leader_id;
149
150    let mut builder =
151        AsyncOperatorBuilder::new("CopyToS3-initialization".to_string(), scope.clone());
152
153    let (start_handle, start_stream) = builder.new_output();
154
155    // Push all errors to the leader worker, so it early exits before doing any initialization work
156    // This should be at-most 1 incoming error per-worker due to the filtering of this stream
157    // in `CopyToS3OneshotSinkConnection::render_continuous_sink`.
158    let mut error_handle = builder.new_input_for(
159        &err_stream,
160        Exchange::new(move |_| u64::cast_from(leader_id)),
161        &start_handle,
162    );
163
164    let button = builder.build(move |caps| async move {
165        let [start_cap] = caps.try_into().unwrap();
166
167        while let Some(event) = error_handle.next().await {
168            match event {
169                AsyncEvent::Data(cap, data) => {
170                    for (error, ts, _) in data {
171                        if !up_to.less_equal(&ts) {
172                            start_handle.give(&cap, Err(error.to_string()));
173                            return;
174                        }
175                    }
176                }
177                AsyncEvent::Progress(frontier) => {
178                    if PartialOrder::less_equal(&up_to, &frontier) {
179                        // No error, break from loop and proceed
180                        break;
181                    }
182                }
183            }
184        }
185
186        if !is_leader {
187            return;
188        }
189
190        start_handle.give(&start_cap, Ok(()));
191    });
192
193    // Broadcast the result to all workers so that they will all see any error that occurs
194    // and exit before doing any unnecessary work
195    (start_stream.broadcast(), button.press_on_drop())
196}
197
198/// Renders the 'completion' operator, which expects a `completion_stream`
199/// that it reads over a Pipeline edge such that it receives a single
200/// completion event per worker. Then forwards this result to the
201/// `worker_callback` after any cleanup work (see below).
202///
203/// On the leader worker, this operator waits to see the empty frontier for
204/// the completion_stream and then does some cleanup work before calling
205/// the callback.
206///
207/// This cleanup work removes the INCOMPLETE sentinel file (see description
208/// of `render_initialization_operator` for more details).
209fn render_completion_operator<G, F>(
210    scope: G,
211    connection_context: ConnectionContext,
212    aws_connection: AwsConnection,
213    connection_id: CatalogItemId,
214    sink_id: GlobalId,
215    s3_key_manager: S3KeyManager,
216    completion_stream: Stream<G, Result<u64, String>>,
217    worker_callback: F,
218) -> PressOnDropButton
219where
220    G: Scope<Timestamp = Timestamp>,
221    F: FnOnce(Result<u64, String>) -> () + 'static,
222{
223    let worker_id = scope.index();
224    let num_workers = scope.peers();
225    let leader_id = usize::cast_from((sink_id, "completion").hashed()) % num_workers;
226    let is_leader = worker_id == leader_id;
227
228    let mut builder = AsyncOperatorBuilder::new("CopyToS3-completion".to_string(), scope.clone());
229
230    let mut completion_input = builder.new_disconnected_input(&completion_stream, Pipeline);
231
232    let button = builder.build(move |_| async move {
233        // fallible async block to use the `?` operator for convenience
234        let fallible_logic = async move {
235            let mut row_count = None;
236            while let Some(event) = completion_input.next().await {
237                if let AsyncEvent::Data(_ts, data) = event {
238                    for result in data {
239                        assert!(
240                            row_count.is_none(),
241                            "unexpectedly received more than 1 event on the completion stream!"
242                        );
243                        row_count = Some(result.map_err(|e| anyhow!(e))?);
244                    }
245                }
246            }
247            let row_count = row_count.expect("did not receive completion event");
248
249            if is_leader {
250                debug!(%sink_id, %worker_id, "s3 leader worker completion");
251                let sdk_config = aws_connection
252                    .load_sdk_config(&connection_context, connection_id, InTask::Yes)
253                    .await?;
254
255                let client = mz_aws_util::s3::new_client(&sdk_config);
256                let bucket = s3_key_manager.bucket.clone();
257                let incomplete_sentinel_key = s3_key_manager.incomplete_sentinel_key();
258
259                // Remove the INCOMPLETE sentinel file to indicate that the upload is complete.
260                // This will race against other replicas who are completing the same uploads,
261                // such that the first replica to complete its uploads will delete the sentinel
262                // and the subsequent replicas shouldn't error if the object is already deleted.
263                // TODO: Should we also write a manifest of all the files uploaded?
264                mz_ore::task::spawn(|| "copytos3:completion", async move {
265                    debug!(%sink_id, %worker_id, "removing INCOMPLETE sentinel file");
266                    client
267                        .delete_object()
268                        .bucket(bucket)
269                        .key(incomplete_sentinel_key)
270                        .send()
271                        .await?;
272                    Ok::<(), anyhow::Error>(())
273                })
274                .wait_and_assert_finished()
275                .await?;
276            }
277            Ok::<u64, anyhow::Error>(row_count)
278        };
279
280        worker_callback(fallible_logic.await.map_err(|e| e.to_string_with_causes()));
281    });
282
283    button.press_on_drop()
284}
285
286/// Renders the `upload operator`, which waits on the `start_stream` to ensure
287/// initialization is complete and then handles the uploads to S3.
288/// Returns a `completion_stream` which contains 1 event per worker of
289/// the result of the upload operation, either an error or the number of rows
290/// uploaded by the worker.
291///
292/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
293/// modulo the number of batches.
294fn render_upload_operator<G, T>(
295    scope: G,
296    connection_context: ConnectionContext,
297    aws_connection: AwsConnection,
298    connection_id: CatalogItemId,
299    connection_details: S3UploadInfo,
300    sink_id: GlobalId,
301    input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
302    up_to: Antichain<G::Timestamp>,
303    start_stream: Stream<G, Result<(), String>>,
304    params: CopyToParameters,
305    output_batch_count: u64,
306) -> (Stream<G, Result<u64, String>>, PressOnDropButton)
307where
308    G: Scope<Timestamp = Timestamp>,
309    T: CopyToS3Uploader,
310{
311    let worker_id = scope.index();
312    let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone());
313
314    let mut input_handle = builder.new_disconnected_input(&input_collection, Pipeline);
315    let (completion_handle, completion_stream) = builder.new_output();
316    let mut start_handle = builder.new_input_for(&start_stream, Pipeline, &completion_handle);
317
318    let button = builder.build(move |caps| async move {
319        let [completion_cap] = caps.try_into().unwrap();
320
321        // Drain any events in the start stream. Once this stream advances to the empty frontier we
322        // can proceed with uploading. If we see an error in this stream, forward it to the completion
323        // stream and early-exit.
324        while let Some(event) = start_handle.next().await {
325            match event {
326                AsyncEvent::Data(cap, data) => {
327                    for res in data {
328                        if res.is_err() {
329                            completion_handle.give(&cap, res.map(|_| 0));
330                            return;
331                        }
332                    }
333                }
334                AsyncEvent::Progress(_) => {}
335            }
336        }
337
338        // fallible async block to use the `?` operator for convenience
339        let res = async move {
340            let sdk_config = aws_connection
341                .load_sdk_config(&connection_context, connection_id, InTask::Yes)
342                .await?;
343
344            // Map of an uploader per batch.
345            let mut s3_uploaders: BTreeMap<u64, T> = BTreeMap::new();
346
347            // As a special case, the 0th worker always forces a file to be
348            // created for batch 0, even if it never sees any data for batch 0.
349            // This ensures that we always write at least one file to S3, even
350            // if the input is empty. See database-issue#8599.
351            if worker_id == 0 {
352                let mut uploader = T::new(
353                    sdk_config.clone(),
354                    connection_details.clone(),
355                    &sink_id,
356                    0,
357                    params.clone(),
358                )?;
359                uploader.force_new_file().await?;
360                s3_uploaders.insert(0, uploader);
361            }
362
363            let mut row_count = 0;
364            while let Some(event) = input_handle.next().await {
365                match event {
366                    AsyncEvent::Data(_ts, data) => {
367                        for ((row, ()), ts, diff) in
368                            data.iter().flatten().flat_map(|chunk| chunk.iter())
369                        {
370                            // We're consuming a batch of data, and the upstream operator has to ensure
371                            // that the data is exchanged according to the batch.
372                            let batch = row.hashed() % output_batch_count;
373                            if !up_to.less_equal(ts) {
374                                if diff.is_negative() {
375                                    anyhow::bail!(
376                                        "Invalid data in source errors, saw retractions ({}) for \
377                                        row that does not exist",
378                                        -*diff,
379                                    )
380                                }
381                                row_count += u64::try_from(diff.into_inner()).unwrap();
382                                let uploader = match s3_uploaders.entry(batch) {
383                                    Entry::Occupied(entry) => entry.into_mut(),
384                                    Entry::Vacant(entry) => {
385                                        debug!(%sink_id, %worker_id, "handling batch: {}", batch);
386                                        entry.insert(T::new(
387                                            sdk_config.clone(),
388                                            connection_details.clone(),
389                                            &sink_id,
390                                            batch,
391                                            params.clone(),
392                                        )?)
393                                    }
394                                };
395                                for _ in 0..diff.into_inner() {
396                                    uploader.append_row(row).await?;
397                                }
398                            }
399                        }
400                    }
401                    AsyncEvent::Progress(frontier) => {
402                        if PartialOrder::less_equal(&up_to, &frontier) {
403                            for uploader in s3_uploaders.values_mut() {
404                                uploader.finish().await?;
405                            }
406                            // We are done, send the final count.
407                            return Ok(row_count);
408                        }
409                    }
410                }
411            }
412            Ok::<u64, anyhow::Error>(row_count)
413        }
414        .await;
415
416        completion_handle.give(&completion_cap, res.map_err(|e| e.to_string_with_causes()));
417    });
418
419    (completion_stream, button.press_on_drop())
420}
421
422/// dyncfg parameters for the copy_to operator, stored in this struct to avoid
423/// requiring storage to depend on the compute crate. See `src/compute-types/src/dyncfgs.rs`
424/// for the definition of these parameters.
425#[derive(Clone, Debug)]
426pub struct CopyToParameters {
427    // The ratio (defined as a percentage) of row-group size to max-file-size.
428    // See the `parquet` module for more details on how this is used.
429    pub parquet_row_group_ratio: usize,
430    // The ratio (defined as a percentage) of arrow-builder size to row-group size.
431    // See the `parquet` module for more details on how this is used.
432    pub arrow_builder_buffer_ratio: usize,
433    // The size of each part in the multi-part upload to use when uploading files to S3.
434    pub s3_multipart_part_size_bytes: usize,
435}
436
437/// This trait is used to abstract over the upload details for different file formats.
438/// Each format has its own buffering semantics and upload logic, since some can be
439/// written in a streaming fashion row-by-row, whereas others use a columnar-based
440/// format that requires buffering a batch of rows before writing to S3.
441trait CopyToS3Uploader: Sized {
442    fn new(
443        sdk_config: SdkConfig,
444        connection_details: S3UploadInfo,
445        sink_id: &GlobalId,
446        batch: u64,
447        params: CopyToParameters,
448    ) -> Result<Self, anyhow::Error>;
449    /// Force the start of a new file, even if no rows have yet been appended or
450    /// if the current file has not yet reached the configured `max_file_size`.
451    async fn force_new_file(&mut self) -> Result<(), anyhow::Error>;
452    /// Append a row to the internal buffer, and optionally flush the buffer to S3.
453    async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error>;
454    /// Flush the full remaining internal buffer to S3, and close all open resources.
455    /// This will be called when the input stream is finished.
456    async fn finish(&mut self) -> Result<(), anyhow::Error>;
457}