Skip to main content

mz_storage_operators/
s3_oneshot_sink.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Uploads a consolidated collection to S3
11
12use std::any::Any;
13use std::collections::BTreeMap;
14use std::collections::btree_map::Entry;
15use std::rc::Rc;
16
17use anyhow::anyhow;
18use aws_types::sdk_config::SdkConfig;
19use differential_dataflow::Hashable;
20use futures::StreamExt;
21use mz_ore::cast::CastFrom;
22use mz_ore::error::ErrorExt;
23use mz_ore::future::InTask;
24use mz_repr::{CatalogItemId, Diff, GlobalId, Row, Timestamp};
25use mz_storage_types::connections::ConnectionContext;
26use mz_storage_types::connections::aws::AwsConnection;
27use mz_storage_types::errors::DataflowError;
28use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
29use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
30use mz_timely_util::builder_async::{
31    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
32};
33use mz_timely_util::columnation::ColumnationStack;
34use timely::PartialOrder;
35use timely::container::CapacityContainerBuilder;
36use timely::dataflow::channels::pact::{Exchange, Pipeline};
37use timely::dataflow::operators::vec::Broadcast;
38use timely::dataflow::{Scope, StreamVec};
39use timely::progress::Antichain;
40use tracing::debug;
41
42mod parquet;
43mod pgcopy;
44
45/// Copy the rows from the input collection to s3.
46/// `worker_callback` is used to send the final count of rows uploaded to s3,
47/// or an error message if the operator failed. This is per-worker, and
48/// these responses are aggregated upstream by the compute client.
49/// `sink_id` is used to identify the sink for logging purposes and as a
50/// unique prefix for files created by the sink.
51///
52/// This renders 3 operators used to coordinate the upload:
53///   - initialization: confirms the S3 path is empty and writes any sentinel files
54///   - upload: uploads data to S3
55///   - completion: removes the sentinel file and calls the `worker_callback`
56///
57/// Returns a token that should be held to keep the sink alive.
58///
59/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
60/// modulo the number of batches.
61pub fn copy_to<'scope, F>(
62    input_collection: StreamVec<
63        'scope,
64        Timestamp,
65        Vec<ColumnationStack<((Row, ()), Timestamp, Diff)>>,
66    >,
67    err_stream: StreamVec<'scope, Timestamp, (DataflowError, Timestamp, Diff)>,
68    up_to: Antichain<Timestamp>,
69    connection_details: S3UploadInfo,
70    connection_context: ConnectionContext,
71    aws_connection: AwsConnection,
72    sink_id: GlobalId,
73    connection_id: CatalogItemId,
74    params: CopyToParameters,
75    worker_callback: F,
76    output_batch_count: u64,
77) -> Rc<dyn Any>
78where
79    F: FnOnce(Result<u64, String>) -> () + 'static,
80{
81    let scope = input_collection.scope();
82
83    let s3_key_manager = S3KeyManager::new(&sink_id, &connection_details.uri);
84
85    let (start_stream, start_token) =
86        render_initialization_operator(scope.clone(), sink_id, up_to.clone(), err_stream);
87
88    let (completion_stream, upload_token) = match connection_details.format {
89        S3SinkFormat::PgCopy(_) => render_upload_operator::<pgcopy::PgCopyUploader>(
90            scope.clone(),
91            connection_context.clone(),
92            aws_connection.clone(),
93            connection_id,
94            connection_details,
95            sink_id,
96            input_collection,
97            up_to,
98            start_stream,
99            params,
100            output_batch_count,
101        ),
102        S3SinkFormat::Parquet => render_upload_operator::<parquet::ParquetUploader>(
103            scope.clone(),
104            connection_context.clone(),
105            aws_connection.clone(),
106            connection_id,
107            connection_details,
108            sink_id,
109            input_collection,
110            up_to,
111            start_stream,
112            params,
113            output_batch_count,
114        ),
115    };
116
117    let completion_token = render_completion_operator(
118        scope,
119        connection_context,
120        aws_connection,
121        connection_id,
122        sink_id,
123        s3_key_manager,
124        completion_stream,
125        worker_callback,
126    );
127
128    Rc::new(vec![start_token, upload_token, completion_token])
129}
130
131/// Renders the 'initialization' operator, which does work on the leader worker only.
132///
133/// The leader worker receives all errors from the `err_stream` and if it
134/// encounters any errors will early exit and broadcast the error on the
135/// returned `start_stream`, to avoid any unnecessary work across all workers.
136///
137/// Returns the `start_stream` with an error received in the `err_stream`, if
138/// any, otherwise `Ok(())`.
139fn render_initialization_operator<'scope>(
140    scope: Scope<'scope, Timestamp>,
141    sink_id: GlobalId,
142    up_to: Antichain<Timestamp>,
143    err_stream: StreamVec<'scope, Timestamp, (DataflowError, Timestamp, Diff)>,
144) -> (
145    StreamVec<'scope, Timestamp, Result<(), String>>,
146    PressOnDropButton,
147) {
148    let worker_id = scope.index();
149    let num_workers = scope.peers();
150    let leader_id = usize::cast_from((sink_id, "initialization").hashed()) % num_workers;
151    let is_leader = worker_id == leader_id;
152
153    let mut builder =
154        AsyncOperatorBuilder::new("CopyToS3-initialization".to_string(), scope.clone());
155
156    let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
157
158    // Push all errors to the leader worker, so it early exits before doing any initialization work
159    // This should be at-most 1 incoming error per-worker due to the filtering of this stream
160    // in `CopyToS3OneshotSinkConnection::render_continuous_sink`.
161    let mut error_handle = builder.new_input_for(
162        err_stream,
163        Exchange::new(move |_| u64::cast_from(leader_id)),
164        &start_handle,
165    );
166
167    let button = builder.build(move |caps| async move {
168        let [start_cap] = caps.try_into().unwrap();
169
170        while let Some(event) = error_handle.next().await {
171            match event {
172                AsyncEvent::Data(cap, data) => {
173                    for (error, ts, _) in data {
174                        if !up_to.less_equal(&ts) {
175                            start_handle.give(&cap, Err(error.to_string()));
176                            return;
177                        }
178                    }
179                }
180                AsyncEvent::Progress(frontier) => {
181                    if PartialOrder::less_equal(&up_to, &frontier) {
182                        // No error, break from loop and proceed
183                        break;
184                    }
185                }
186            }
187        }
188
189        if !is_leader {
190            return;
191        }
192
193        start_handle.give(&start_cap, Ok(()));
194    });
195
196    // Broadcast the result to all workers so that they will all see any error that occurs
197    // and exit before doing any unnecessary work
198    (start_stream.broadcast(), button.press_on_drop())
199}
200
201/// Renders the 'completion' operator, which expects a `completion_stream`
202/// that it reads over a Pipeline edge such that it receives a single
203/// completion event per worker. Then forwards this result to the
204/// `worker_callback` after any cleanup work (see below).
205///
206/// On the leader worker, this operator waits to see the empty frontier for
207/// the completion_stream and then does some cleanup work before calling
208/// the callback.
209///
210/// This cleanup work removes the INCOMPLETE sentinel file (see description
211/// of `render_initialization_operator` for more details).
212fn render_completion_operator<'scope, F>(
213    scope: Scope<'scope, Timestamp>,
214    connection_context: ConnectionContext,
215    aws_connection: AwsConnection,
216    connection_id: CatalogItemId,
217    sink_id: GlobalId,
218    s3_key_manager: S3KeyManager,
219    completion_stream: StreamVec<'scope, Timestamp, Result<u64, String>>,
220    worker_callback: F,
221) -> PressOnDropButton
222where
223    F: FnOnce(Result<u64, String>) -> () + 'static,
224{
225    let worker_id = scope.index();
226    let num_workers = scope.peers();
227    let leader_id = usize::cast_from((sink_id, "completion").hashed()) % num_workers;
228    let is_leader = worker_id == leader_id;
229
230    let mut builder = AsyncOperatorBuilder::new("CopyToS3-completion".to_string(), scope.clone());
231
232    let mut completion_input = builder.new_disconnected_input(completion_stream, Pipeline);
233
234    let button = builder.build(move |_| async move {
235        // fallible async block to use the `?` operator for convenience
236        let fallible_logic = async move {
237            let mut row_count = None;
238            while let Some(event) = completion_input.next().await {
239                if let AsyncEvent::Data(_ts, data) = event {
240                    for result in data {
241                        assert!(
242                            row_count.is_none(),
243                            "unexpectedly received more than 1 event on the completion stream!"
244                        );
245                        row_count = Some(result.map_err(|e| anyhow!(e))?);
246                    }
247                }
248            }
249            let row_count = row_count.expect("did not receive completion event");
250
251            if is_leader {
252                debug!(%sink_id, %worker_id, "s3 leader worker completion");
253                let sdk_config = aws_connection
254                    .load_sdk_config(&connection_context, connection_id, InTask::Yes)
255                    .await?;
256
257                let client = mz_aws_util::s3::new_client(&sdk_config);
258                let bucket = s3_key_manager.bucket.clone();
259                let incomplete_sentinel_key = s3_key_manager.incomplete_sentinel_key();
260
261                // Remove the INCOMPLETE sentinel file to indicate that the upload is complete.
262                // This will race against other replicas who are completing the same uploads,
263                // such that the first replica to complete its uploads will delete the sentinel
264                // and the subsequent replicas shouldn't error if the object is already deleted.
265                // TODO: Should we also write a manifest of all the files uploaded?
266                mz_ore::task::spawn(|| "copytos3:completion", async move {
267                    debug!(%sink_id, %worker_id, "removing INCOMPLETE sentinel file");
268                    client
269                        .delete_object()
270                        .bucket(bucket)
271                        .key(incomplete_sentinel_key)
272                        .send()
273                        .await?;
274                    Ok::<(), anyhow::Error>(())
275                })
276                .await?;
277            }
278            Ok::<u64, anyhow::Error>(row_count)
279        };
280
281        worker_callback(fallible_logic.await.map_err(|e| e.to_string_with_causes()));
282    });
283
284    button.press_on_drop()
285}
286
287/// Renders the `upload operator`, which waits on the `start_stream` to ensure
288/// initialization is complete and then handles the uploads to S3.
289/// Returns a `completion_stream` which contains 1 event per worker of
290/// the result of the upload operation, either an error or the number of rows
291/// uploaded by the worker.
292///
293/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
294/// modulo the number of batches.
295fn render_upload_operator<'scope, T>(
296    scope: Scope<'scope, Timestamp>,
297    connection_context: ConnectionContext,
298    aws_connection: AwsConnection,
299    connection_id: CatalogItemId,
300    connection_details: S3UploadInfo,
301    sink_id: GlobalId,
302    input_collection: StreamVec<
303        'scope,
304        Timestamp,
305        Vec<ColumnationStack<((Row, ()), Timestamp, Diff)>>,
306    >,
307    up_to: Antichain<Timestamp>,
308    start_stream: StreamVec<'scope, Timestamp, Result<(), String>>,
309    params: CopyToParameters,
310    output_batch_count: u64,
311) -> (
312    StreamVec<'scope, Timestamp, Result<u64, String>>,
313    PressOnDropButton,
314)
315where
316    T: CopyToS3Uploader,
317{
318    let worker_id = scope.index();
319    let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone());
320
321    let mut input_handle = builder.new_disconnected_input(input_collection, Pipeline);
322    let (completion_handle, completion_stream) =
323        builder.new_output::<CapacityContainerBuilder<_>>();
324    let mut start_handle = builder.new_input_for(start_stream, Pipeline, &completion_handle);
325
326    let button = builder.build(move |caps| async move {
327        let [completion_cap] = caps.try_into().unwrap();
328
329        // Drain any events in the start stream. Once this stream advances to the empty frontier we
330        // can proceed with uploading. If we see an error in this stream, forward it to the completion
331        // stream and early-exit.
332        while let Some(event) = start_handle.next().await {
333            match event {
334                AsyncEvent::Data(cap, data) => {
335                    for res in data {
336                        if res.is_err() {
337                            completion_handle.give(&cap, res.map(|_| 0));
338                            return;
339                        }
340                    }
341                }
342                AsyncEvent::Progress(_) => {}
343            }
344        }
345
346        // fallible async block to use the `?` operator for convenience
347        let res = async move {
348            let sdk_config = aws_connection
349                .load_sdk_config(&connection_context, connection_id, InTask::Yes)
350                .await?;
351
352            // Map of an uploader per batch.
353            let mut s3_uploaders: BTreeMap<u64, T> = BTreeMap::new();
354
355            // As a special case, the 0th worker always forces a file to be
356            // created for batch 0, even if it never sees any data for batch 0.
357            // This ensures that we always write at least one file to S3, even
358            // if the input is empty. See database-issue#8599.
359            if worker_id == 0 {
360                let mut uploader = T::new(
361                    sdk_config.clone(),
362                    connection_details.clone(),
363                    &sink_id,
364                    0,
365                    params.clone(),
366                )?;
367                uploader.force_new_file().await?;
368                s3_uploaders.insert(0, uploader);
369            }
370
371            let mut row_count = 0;
372            while let Some(event) = input_handle.next().await {
373                match event {
374                    AsyncEvent::Data(_ts, data) => {
375                        for ((row, ()), ts, diff) in
376                            data.iter().flatten().flat_map(|chunk| chunk.iter())
377                        {
378                            // We're consuming a batch of data, and the upstream operator has to ensure
379                            // that the data is exchanged according to the batch.
380                            let batch = row.hashed() % output_batch_count;
381                            if !up_to.less_equal(ts) {
382                                if diff.is_negative() {
383                                    tracing::error!(
384                                        %sink_id, %diff, ?row,
385                                        "S3 oneshot sink encountered negative multiplicities",
386                                    );
387                                    anyhow::bail!(
388                                        "Invalid data in source, \
389                                         saw retractions ({}) for row that does not exist: {:?}",
390                                        -*diff,
391                                        row,
392                                    )
393                                }
394                                row_count += u64::try_from(diff.into_inner()).unwrap();
395                                let uploader = match s3_uploaders.entry(batch) {
396                                    Entry::Occupied(entry) => entry.into_mut(),
397                                    Entry::Vacant(entry) => {
398                                        debug!(%sink_id, %worker_id, "handling batch: {}", batch);
399                                        entry.insert(T::new(
400                                            sdk_config.clone(),
401                                            connection_details.clone(),
402                                            &sink_id,
403                                            batch,
404                                            params.clone(),
405                                        )?)
406                                    }
407                                };
408                                for _ in 0..diff.into_inner() {
409                                    uploader.append_row(row).await?;
410                                }
411                            }
412                        }
413                    }
414                    AsyncEvent::Progress(frontier) => {
415                        if PartialOrder::less_equal(&up_to, &frontier) {
416                            for uploader in s3_uploaders.values_mut() {
417                                uploader.finish().await?;
418                            }
419                            // We are done, send the final count.
420                            return Ok(row_count);
421                        }
422                    }
423                }
424            }
425            Ok::<u64, anyhow::Error>(row_count)
426        }
427        .await;
428
429        completion_handle.give(&completion_cap, res.map_err(|e| e.to_string_with_causes()));
430    });
431
432    (completion_stream, button.press_on_drop())
433}
434
435/// dyncfg parameters for the copy_to operator, stored in this struct to avoid
436/// requiring storage to depend on the compute crate. See `src/compute-types/src/dyncfgs.rs`
437/// for the definition of these parameters.
438#[derive(Clone, Debug)]
439pub struct CopyToParameters {
440    // The ratio (defined as a percentage) of row-group size to max-file-size.
441    // See the `parquet` module for more details on how this is used.
442    pub parquet_row_group_ratio: usize,
443    // The ratio (defined as a percentage) of arrow-builder size to row-group size.
444    // See the `parquet` module for more details on how this is used.
445    pub arrow_builder_buffer_ratio: usize,
446    // The size of each part in the multi-part upload to use when uploading files to S3.
447    pub s3_multipart_part_size_bytes: usize,
448}
449
450/// This trait is used to abstract over the upload details for different file formats.
451/// Each format has its own buffering semantics and upload logic, since some can be
452/// written in a streaming fashion row-by-row, whereas others use a columnar-based
453/// format that requires buffering a batch of rows before writing to S3.
454trait CopyToS3Uploader: Sized {
455    fn new(
456        sdk_config: SdkConfig,
457        connection_details: S3UploadInfo,
458        sink_id: &GlobalId,
459        batch: u64,
460        params: CopyToParameters,
461    ) -> Result<Self, anyhow::Error>;
462    /// Force the start of a new file, even if no rows have yet been appended or
463    /// if the current file has not yet reached the configured `max_file_size`.
464    async fn force_new_file(&mut self) -> Result<(), anyhow::Error>;
465    /// Append a row to the internal buffer, and optionally flush the buffer to S3.
466    async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error>;
467    /// Flush the full remaining internal buffer to S3, and close all open resources.
468    /// This will be called when the input stream is finished.
469    async fn finish(&mut self) -> Result<(), anyhow::Error>;
470}