mz_storage_operators/
s3_oneshot_sink.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Uploads a consolidated collection to S3
11
12use std::any::Any;
13use std::collections::BTreeMap;
14use std::collections::btree_map::Entry;
15use std::rc::Rc;
16
17use anyhow::anyhow;
18use aws_types::sdk_config::SdkConfig;
19use differential_dataflow::Hashable;
20use differential_dataflow::containers::TimelyStack;
21use futures::StreamExt;
22use mz_ore::cast::CastFrom;
23use mz_ore::error::ErrorExt;
24use mz_ore::future::InTask;
25use mz_ore::task::JoinHandleExt;
26use mz_repr::{CatalogItemId, Diff, GlobalId, Row, Timestamp};
27use mz_storage_types::connections::ConnectionContext;
28use mz_storage_types::connections::aws::AwsConnection;
29use mz_storage_types::errors::DataflowError;
30use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
31use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
32use mz_timely_util::builder_async::{
33    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
34};
35use timely::PartialOrder;
36use timely::container::CapacityContainerBuilder;
37use timely::dataflow::channels::pact::{Exchange, Pipeline};
38use timely::dataflow::operators::Broadcast;
39use timely::dataflow::{Scope, Stream};
40use timely::progress::Antichain;
41use tracing::debug;
42
43mod parquet;
44mod pgcopy;
45
46/// Copy the rows from the input collection to s3.
47/// `worker_callback` is used to send the final count of rows uploaded to s3,
48/// or an error message if the operator failed. This is per-worker, and
49/// these responses are aggregated upstream by the compute client.
50/// `sink_id` is used to identify the sink for logging purposes and as a
51/// unique prefix for files created by the sink.
52///
53/// This renders 3 operators used to coordinate the upload:
54///   - initialization: confirms the S3 path is empty and writes any sentinel files
55///   - upload: uploads data to S3
56///   - completion: removes the sentinel file and calls the `worker_callback`
57///
58/// Returns a token that should be held to keep the sink alive.
59///
60/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
61/// modulo the number of batches.
62pub fn copy_to<G, F>(
63    input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
64    err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
65    up_to: Antichain<G::Timestamp>,
66    connection_details: S3UploadInfo,
67    connection_context: ConnectionContext,
68    aws_connection: AwsConnection,
69    sink_id: GlobalId,
70    connection_id: CatalogItemId,
71    params: CopyToParameters,
72    worker_callback: F,
73    output_batch_count: u64,
74) -> Rc<dyn Any>
75where
76    G: Scope<Timestamp = Timestamp>,
77    F: FnOnce(Result<u64, String>) -> () + 'static,
78{
79    let scope = input_collection.scope();
80
81    let s3_key_manager = S3KeyManager::new(&sink_id, &connection_details.uri);
82
83    let (start_stream, start_token) =
84        render_initialization_operator(scope.clone(), sink_id, up_to.clone(), err_stream);
85
86    let (completion_stream, upload_token) = match connection_details.format {
87        S3SinkFormat::PgCopy(_) => render_upload_operator::<G, pgcopy::PgCopyUploader>(
88            scope.clone(),
89            connection_context.clone(),
90            aws_connection.clone(),
91            connection_id,
92            connection_details,
93            sink_id,
94            input_collection,
95            up_to,
96            start_stream,
97            params,
98            output_batch_count,
99        ),
100        S3SinkFormat::Parquet => render_upload_operator::<G, parquet::ParquetUploader>(
101            scope.clone(),
102            connection_context.clone(),
103            aws_connection.clone(),
104            connection_id,
105            connection_details,
106            sink_id,
107            input_collection,
108            up_to,
109            start_stream,
110            params,
111            output_batch_count,
112        ),
113    };
114
115    let completion_token = render_completion_operator(
116        scope,
117        connection_context,
118        aws_connection,
119        connection_id,
120        sink_id,
121        s3_key_manager,
122        completion_stream,
123        worker_callback,
124    );
125
126    Rc::new(vec![start_token, upload_token, completion_token])
127}
128
129/// Renders the 'initialization' operator, which does work on the leader worker only.
130///
131/// The leader worker receives all errors from the `err_stream` and if it
132/// encounters any errors will early exit and broadcast the error on the
133/// returned `start_stream`, to avoid any unnecessary work across all workers.
134///
135/// Returns the `start_stream` with an error received in the `err_stream`, if
136/// any, otherwise `Ok(())`.
137fn render_initialization_operator<G>(
138    scope: G,
139    sink_id: GlobalId,
140    up_to: Antichain<G::Timestamp>,
141    err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
142) -> (Stream<G, Result<(), String>>, PressOnDropButton)
143where
144    G: Scope<Timestamp = Timestamp>,
145{
146    let worker_id = scope.index();
147    let num_workers = scope.peers();
148    let leader_id = usize::cast_from((sink_id, "initialization").hashed()) % num_workers;
149    let is_leader = worker_id == leader_id;
150
151    let mut builder =
152        AsyncOperatorBuilder::new("CopyToS3-initialization".to_string(), scope.clone());
153
154    let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
155
156    // Push all errors to the leader worker, so it early exits before doing any initialization work
157    // This should be at-most 1 incoming error per-worker due to the filtering of this stream
158    // in `CopyToS3OneshotSinkConnection::render_continuous_sink`.
159    let mut error_handle = builder.new_input_for(
160        &err_stream,
161        Exchange::new(move |_| u64::cast_from(leader_id)),
162        &start_handle,
163    );
164
165    let button = builder.build(move |caps| async move {
166        let [start_cap] = caps.try_into().unwrap();
167
168        while let Some(event) = error_handle.next().await {
169            match event {
170                AsyncEvent::Data(cap, data) => {
171                    for (error, ts, _) in data {
172                        if !up_to.less_equal(&ts) {
173                            start_handle.give(&cap, Err(error.to_string()));
174                            return;
175                        }
176                    }
177                }
178                AsyncEvent::Progress(frontier) => {
179                    if PartialOrder::less_equal(&up_to, &frontier) {
180                        // No error, break from loop and proceed
181                        break;
182                    }
183                }
184            }
185        }
186
187        if !is_leader {
188            return;
189        }
190
191        start_handle.give(&start_cap, Ok(()));
192    });
193
194    // Broadcast the result to all workers so that they will all see any error that occurs
195    // and exit before doing any unnecessary work
196    (start_stream.broadcast(), button.press_on_drop())
197}
198
199/// Renders the 'completion' operator, which expects a `completion_stream`
200/// that it reads over a Pipeline edge such that it receives a single
201/// completion event per worker. Then forwards this result to the
202/// `worker_callback` after any cleanup work (see below).
203///
204/// On the leader worker, this operator waits to see the empty frontier for
205/// the completion_stream and then does some cleanup work before calling
206/// the callback.
207///
208/// This cleanup work removes the INCOMPLETE sentinel file (see description
209/// of `render_initialization_operator` for more details).
210fn render_completion_operator<G, F>(
211    scope: G,
212    connection_context: ConnectionContext,
213    aws_connection: AwsConnection,
214    connection_id: CatalogItemId,
215    sink_id: GlobalId,
216    s3_key_manager: S3KeyManager,
217    completion_stream: Stream<G, Result<u64, String>>,
218    worker_callback: F,
219) -> PressOnDropButton
220where
221    G: Scope<Timestamp = Timestamp>,
222    F: FnOnce(Result<u64, String>) -> () + 'static,
223{
224    let worker_id = scope.index();
225    let num_workers = scope.peers();
226    let leader_id = usize::cast_from((sink_id, "completion").hashed()) % num_workers;
227    let is_leader = worker_id == leader_id;
228
229    let mut builder = AsyncOperatorBuilder::new("CopyToS3-completion".to_string(), scope.clone());
230
231    let mut completion_input = builder.new_disconnected_input(&completion_stream, Pipeline);
232
233    let button = builder.build(move |_| async move {
234        // fallible async block to use the `?` operator for convenience
235        let fallible_logic = async move {
236            let mut row_count = None;
237            while let Some(event) = completion_input.next().await {
238                if let AsyncEvent::Data(_ts, data) = event {
239                    for result in data {
240                        assert!(
241                            row_count.is_none(),
242                            "unexpectedly received more than 1 event on the completion stream!"
243                        );
244                        row_count = Some(result.map_err(|e| anyhow!(e))?);
245                    }
246                }
247            }
248            let row_count = row_count.expect("did not receive completion event");
249
250            if is_leader {
251                debug!(%sink_id, %worker_id, "s3 leader worker completion");
252                let sdk_config = aws_connection
253                    .load_sdk_config(&connection_context, connection_id, InTask::Yes)
254                    .await?;
255
256                let client = mz_aws_util::s3::new_client(&sdk_config);
257                let bucket = s3_key_manager.bucket.clone();
258                let incomplete_sentinel_key = s3_key_manager.incomplete_sentinel_key();
259
260                // Remove the INCOMPLETE sentinel file to indicate that the upload is complete.
261                // This will race against other replicas who are completing the same uploads,
262                // such that the first replica to complete its uploads will delete the sentinel
263                // and the subsequent replicas shouldn't error if the object is already deleted.
264                // TODO: Should we also write a manifest of all the files uploaded?
265                mz_ore::task::spawn(|| "copytos3:completion", async move {
266                    debug!(%sink_id, %worker_id, "removing INCOMPLETE sentinel file");
267                    client
268                        .delete_object()
269                        .bucket(bucket)
270                        .key(incomplete_sentinel_key)
271                        .send()
272                        .await?;
273                    Ok::<(), anyhow::Error>(())
274                })
275                .wait_and_assert_finished()
276                .await?;
277            }
278            Ok::<u64, anyhow::Error>(row_count)
279        };
280
281        worker_callback(fallible_logic.await.map_err(|e| e.to_string_with_causes()));
282    });
283
284    button.press_on_drop()
285}
286
287/// Renders the `upload operator`, which waits on the `start_stream` to ensure
288/// initialization is complete and then handles the uploads to S3.
289/// Returns a `completion_stream` which contains 1 event per worker of
290/// the result of the upload operation, either an error or the number of rows
291/// uploaded by the worker.
292///
293/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
294/// modulo the number of batches.
295fn render_upload_operator<G, T>(
296    scope: G,
297    connection_context: ConnectionContext,
298    aws_connection: AwsConnection,
299    connection_id: CatalogItemId,
300    connection_details: S3UploadInfo,
301    sink_id: GlobalId,
302    input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
303    up_to: Antichain<G::Timestamp>,
304    start_stream: Stream<G, Result<(), String>>,
305    params: CopyToParameters,
306    output_batch_count: u64,
307) -> (Stream<G, Result<u64, String>>, PressOnDropButton)
308where
309    G: Scope<Timestamp = Timestamp>,
310    T: CopyToS3Uploader,
311{
312    let worker_id = scope.index();
313    let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone());
314
315    let mut input_handle = builder.new_disconnected_input(&input_collection, Pipeline);
316    let (completion_handle, completion_stream) =
317        builder.new_output::<CapacityContainerBuilder<_>>();
318    let mut start_handle = builder.new_input_for(&start_stream, Pipeline, &completion_handle);
319
320    let button = builder.build(move |caps| async move {
321        let [completion_cap] = caps.try_into().unwrap();
322
323        // Drain any events in the start stream. Once this stream advances to the empty frontier we
324        // can proceed with uploading. If we see an error in this stream, forward it to the completion
325        // stream and early-exit.
326        while let Some(event) = start_handle.next().await {
327            match event {
328                AsyncEvent::Data(cap, data) => {
329                    for res in data {
330                        if res.is_err() {
331                            completion_handle.give(&cap, res.map(|_| 0));
332                            return;
333                        }
334                    }
335                }
336                AsyncEvent::Progress(_) => {}
337            }
338        }
339
340        // fallible async block to use the `?` operator for convenience
341        let res = async move {
342            let sdk_config = aws_connection
343                .load_sdk_config(&connection_context, connection_id, InTask::Yes)
344                .await?;
345
346            // Map of an uploader per batch.
347            let mut s3_uploaders: BTreeMap<u64, T> = BTreeMap::new();
348
349            // As a special case, the 0th worker always forces a file to be
350            // created for batch 0, even if it never sees any data for batch 0.
351            // This ensures that we always write at least one file to S3, even
352            // if the input is empty. See database-issue#8599.
353            if worker_id == 0 {
354                let mut uploader = T::new(
355                    sdk_config.clone(),
356                    connection_details.clone(),
357                    &sink_id,
358                    0,
359                    params.clone(),
360                )?;
361                uploader.force_new_file().await?;
362                s3_uploaders.insert(0, uploader);
363            }
364
365            let mut row_count = 0;
366            while let Some(event) = input_handle.next().await {
367                match event {
368                    AsyncEvent::Data(_ts, data) => {
369                        for ((row, ()), ts, diff) in
370                            data.iter().flatten().flat_map(|chunk| chunk.iter())
371                        {
372                            // We're consuming a batch of data, and the upstream operator has to ensure
373                            // that the data is exchanged according to the batch.
374                            let batch = row.hashed() % output_batch_count;
375                            if !up_to.less_equal(ts) {
376                                if diff.is_negative() {
377                                    tracing::error!(
378                                        %sink_id, %diff, ?row,
379                                        "S3 oneshot sink encountered negative multiplicities",
380                                    );
381                                    anyhow::bail!(
382                                        "Invalid data in source, \
383                                         saw retractions ({}) for row that does not exist: {:?}",
384                                        -*diff,
385                                        row,
386                                    )
387                                }
388                                row_count += u64::try_from(diff.into_inner()).unwrap();
389                                let uploader = match s3_uploaders.entry(batch) {
390                                    Entry::Occupied(entry) => entry.into_mut(),
391                                    Entry::Vacant(entry) => {
392                                        debug!(%sink_id, %worker_id, "handling batch: {}", batch);
393                                        entry.insert(T::new(
394                                            sdk_config.clone(),
395                                            connection_details.clone(),
396                                            &sink_id,
397                                            batch,
398                                            params.clone(),
399                                        )?)
400                                    }
401                                };
402                                for _ in 0..diff.into_inner() {
403                                    uploader.append_row(row).await?;
404                                }
405                            }
406                        }
407                    }
408                    AsyncEvent::Progress(frontier) => {
409                        if PartialOrder::less_equal(&up_to, &frontier) {
410                            for uploader in s3_uploaders.values_mut() {
411                                uploader.finish().await?;
412                            }
413                            // We are done, send the final count.
414                            return Ok(row_count);
415                        }
416                    }
417                }
418            }
419            Ok::<u64, anyhow::Error>(row_count)
420        }
421        .await;
422
423        completion_handle.give(&completion_cap, res.map_err(|e| e.to_string_with_causes()));
424    });
425
426    (completion_stream, button.press_on_drop())
427}
428
429/// dyncfg parameters for the copy_to operator, stored in this struct to avoid
430/// requiring storage to depend on the compute crate. See `src/compute-types/src/dyncfgs.rs`
431/// for the definition of these parameters.
432#[derive(Clone, Debug)]
433pub struct CopyToParameters {
434    // The ratio (defined as a percentage) of row-group size to max-file-size.
435    // See the `parquet` module for more details on how this is used.
436    pub parquet_row_group_ratio: usize,
437    // The ratio (defined as a percentage) of arrow-builder size to row-group size.
438    // See the `parquet` module for more details on how this is used.
439    pub arrow_builder_buffer_ratio: usize,
440    // The size of each part in the multi-part upload to use when uploading files to S3.
441    pub s3_multipart_part_size_bytes: usize,
442}
443
444/// This trait is used to abstract over the upload details for different file formats.
445/// Each format has its own buffering semantics and upload logic, since some can be
446/// written in a streaming fashion row-by-row, whereas others use a columnar-based
447/// format that requires buffering a batch of rows before writing to S3.
448trait CopyToS3Uploader: Sized {
449    fn new(
450        sdk_config: SdkConfig,
451        connection_details: S3UploadInfo,
452        sink_id: &GlobalId,
453        batch: u64,
454        params: CopyToParameters,
455    ) -> Result<Self, anyhow::Error>;
456    /// Force the start of a new file, even if no rows have yet been appended or
457    /// if the current file has not yet reached the configured `max_file_size`.
458    async fn force_new_file(&mut self) -> Result<(), anyhow::Error>;
459    /// Append a row to the internal buffer, and optionally flush the buffer to S3.
460    async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error>;
461    /// Flush the full remaining internal buffer to S3, and close all open resources.
462    /// This will be called when the input stream is finished.
463    async fn finish(&mut self) -> Result<(), anyhow::Error>;
464}