Skip to main content

mz_storage_operators/
s3_oneshot_sink.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Uploads a consolidated collection to S3
11
12use std::any::Any;
13use std::collections::BTreeMap;
14use std::collections::btree_map::Entry;
15use std::rc::Rc;
16
17use anyhow::anyhow;
18use aws_types::sdk_config::SdkConfig;
19use differential_dataflow::Hashable;
20use futures::StreamExt;
21use mz_ore::cast::CastFrom;
22use mz_ore::error::ErrorExt;
23use mz_ore::future::InTask;
24use mz_repr::{CatalogItemId, Diff, GlobalId, Row, Timestamp};
25use mz_storage_types::connections::ConnectionContext;
26use mz_storage_types::connections::aws::AwsConnection;
27use mz_storage_types::errors::DataflowError;
28use mz_storage_types::sinks::s3_oneshot_sink::S3KeyManager;
29use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
30use mz_timely_util::builder_async::{
31    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
32};
33use mz_timely_util::columnation::ColumnationStack;
34use timely::PartialOrder;
35use timely::container::CapacityContainerBuilder;
36use timely::dataflow::channels::pact::{Exchange, Pipeline};
37use timely::dataflow::operators::vec::Broadcast;
38use timely::dataflow::{Scope, StreamVec};
39use timely::progress::Antichain;
40use tracing::debug;
41
42mod parquet;
43mod pgcopy;
44
45/// Copy the rows from the input collection to s3.
46/// `worker_callback` is used to send the final count of rows uploaded to s3,
47/// or an error message if the operator failed. This is per-worker, and
48/// these responses are aggregated upstream by the compute client.
49/// `sink_id` is used to identify the sink for logging purposes and as a
50/// unique prefix for files created by the sink.
51///
52/// This renders 3 operators used to coordinate the upload:
53///   - initialization: confirms the S3 path is empty and writes any sentinel files
54///   - upload: uploads data to S3
55///   - completion: removes the sentinel file and calls the `worker_callback`
56///
57/// Returns a token that should be held to keep the sink alive.
58///
59/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
60/// modulo the number of batches.
61pub fn copy_to<'scope, F>(
62    input_collection: StreamVec<
63        'scope,
64        Timestamp,
65        Vec<ColumnationStack<((Row, ()), Timestamp, Diff)>>,
66    >,
67    err_stream: StreamVec<'scope, Timestamp, (DataflowError, Timestamp, Diff)>,
68    up_to: Antichain<Timestamp>,
69    connection_details: S3UploadInfo,
70    connection_context: ConnectionContext,
71    aws_connection: AwsConnection,
72    sink_id: GlobalId,
73    connection_id: CatalogItemId,
74    params: CopyToParameters,
75    worker_callback: F,
76    output_batch_count: u64,
77    enforce_external_addresses: bool,
78) -> Rc<dyn Any>
79where
80    F: FnOnce(Result<u64, String>) -> () + 'static,
81{
82    let scope = input_collection.scope();
83
84    let s3_key_manager = S3KeyManager::new(&sink_id, &connection_details.uri);
85
86    let (start_stream, start_token) =
87        render_initialization_operator(scope.clone(), sink_id, up_to.clone(), err_stream);
88
89    let (completion_stream, upload_token) = match connection_details.format {
90        S3SinkFormat::PgCopy(_) => render_upload_operator::<pgcopy::PgCopyUploader>(
91            scope.clone(),
92            connection_context.clone(),
93            aws_connection.clone(),
94            connection_id,
95            connection_details,
96            sink_id,
97            input_collection,
98            up_to,
99            start_stream,
100            params,
101            output_batch_count,
102            enforce_external_addresses,
103        ),
104        S3SinkFormat::Parquet => render_upload_operator::<parquet::ParquetUploader>(
105            scope.clone(),
106            connection_context.clone(),
107            aws_connection.clone(),
108            connection_id,
109            connection_details,
110            sink_id,
111            input_collection,
112            up_to,
113            start_stream,
114            params,
115            output_batch_count,
116            enforce_external_addresses,
117        ),
118    };
119
120    let completion_token = render_completion_operator(
121        scope,
122        connection_context,
123        aws_connection,
124        connection_id,
125        sink_id,
126        s3_key_manager,
127        completion_stream,
128        worker_callback,
129        enforce_external_addresses,
130    );
131
132    Rc::new(vec![start_token, upload_token, completion_token])
133}
134
135/// Renders the 'initialization' operator, which does work on the leader worker only.
136///
137/// The leader worker receives all errors from the `err_stream` and if it
138/// encounters any errors will early exit and broadcast the error on the
139/// returned `start_stream`, to avoid any unnecessary work across all workers.
140///
141/// Returns the `start_stream` with an error received in the `err_stream`, if
142/// any, otherwise `Ok(())`.
143fn render_initialization_operator<'scope>(
144    scope: Scope<'scope, Timestamp>,
145    sink_id: GlobalId,
146    up_to: Antichain<Timestamp>,
147    err_stream: StreamVec<'scope, Timestamp, (DataflowError, Timestamp, Diff)>,
148) -> (
149    StreamVec<'scope, Timestamp, Result<(), String>>,
150    PressOnDropButton,
151) {
152    let worker_id = scope.index();
153    let num_workers = scope.peers();
154    let leader_id = usize::cast_from((sink_id, "initialization").hashed()) % num_workers;
155    let is_leader = worker_id == leader_id;
156
157    let mut builder =
158        AsyncOperatorBuilder::new("CopyToS3-initialization".to_string(), scope.clone());
159
160    let (start_handle, start_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
161
162    // Push all errors to the leader worker, so it early exits before doing any initialization work
163    // This should be at-most 1 incoming error per-worker due to the filtering of this stream
164    // in `CopyToS3OneshotSinkConnection::render_continuous_sink`.
165    let mut error_handle = builder.new_input_for(
166        err_stream,
167        Exchange::new(move |_| u64::cast_from(leader_id)),
168        &start_handle,
169    );
170
171    let button = builder.build(move |caps| async move {
172        let [start_cap] = caps.try_into().unwrap();
173
174        while let Some(event) = error_handle.next().await {
175            match event {
176                AsyncEvent::Data(cap, data) => {
177                    for (error, ts, _) in data {
178                        if !up_to.less_equal(&ts) {
179                            start_handle.give(&cap, Err(error.to_string()));
180                            return;
181                        }
182                    }
183                }
184                AsyncEvent::Progress(frontier) => {
185                    if PartialOrder::less_equal(&up_to, &frontier) {
186                        // No error, break from loop and proceed
187                        break;
188                    }
189                }
190            }
191        }
192
193        if !is_leader {
194            return;
195        }
196
197        start_handle.give(&start_cap, Ok(()));
198    });
199
200    // Broadcast the result to all workers so that they will all see any error that occurs
201    // and exit before doing any unnecessary work
202    (start_stream.broadcast(), button.press_on_drop())
203}
204
205/// Renders the 'completion' operator, which expects a `completion_stream`
206/// that it reads over a Pipeline edge such that it receives a single
207/// completion event per worker. Then forwards this result to the
208/// `worker_callback` after any cleanup work (see below).
209///
210/// On the leader worker, this operator waits to see the empty frontier for
211/// the completion_stream and then does some cleanup work before calling
212/// the callback.
213///
214/// This cleanup work removes the INCOMPLETE sentinel file (see description
215/// of `render_initialization_operator` for more details).
216fn render_completion_operator<'scope, F>(
217    scope: Scope<'scope, Timestamp>,
218    connection_context: ConnectionContext,
219    aws_connection: AwsConnection,
220    connection_id: CatalogItemId,
221    sink_id: GlobalId,
222    s3_key_manager: S3KeyManager,
223    completion_stream: StreamVec<'scope, Timestamp, Result<u64, String>>,
224    worker_callback: F,
225    enforce_external_addresses: bool,
226) -> PressOnDropButton
227where
228    F: FnOnce(Result<u64, String>) -> () + 'static,
229{
230    let worker_id = scope.index();
231    let num_workers = scope.peers();
232    let leader_id = usize::cast_from((sink_id, "completion").hashed()) % num_workers;
233    let is_leader = worker_id == leader_id;
234
235    let mut builder = AsyncOperatorBuilder::new("CopyToS3-completion".to_string(), scope.clone());
236
237    let mut completion_input = builder.new_disconnected_input(completion_stream, Pipeline);
238
239    let button = builder.build(move |_| async move {
240        // fallible async block to use the `?` operator for convenience
241        let fallible_logic = async move {
242            let mut row_count = None;
243            while let Some(event) = completion_input.next().await {
244                if let AsyncEvent::Data(_ts, data) = event {
245                    for result in data {
246                        assert!(
247                            row_count.is_none(),
248                            "unexpectedly received more than 1 event on the completion stream!"
249                        );
250                        row_count = Some(result.map_err(|e| anyhow!(e))?);
251                    }
252                }
253            }
254            let row_count = row_count.expect("did not receive completion event");
255
256            if is_leader {
257                debug!(%sink_id, %worker_id, "s3 leader worker completion");
258                let sdk_config = aws_connection
259                    .load_sdk_config(
260                        &connection_context,
261                        connection_id,
262                        InTask::Yes,
263                        enforce_external_addresses,
264                    )
265                    .await?;
266
267                let client = mz_aws_util::s3::new_client(&sdk_config);
268                let bucket = s3_key_manager.bucket.clone();
269                let incomplete_sentinel_key = s3_key_manager.incomplete_sentinel_key();
270
271                // Remove the INCOMPLETE sentinel file to indicate that the upload is complete.
272                // This will race against other replicas who are completing the same uploads,
273                // such that the first replica to complete its uploads will delete the sentinel
274                // and the subsequent replicas shouldn't error if the object is already deleted.
275                // TODO: Should we also write a manifest of all the files uploaded?
276                mz_ore::task::spawn(|| "copytos3:completion", async move {
277                    debug!(%sink_id, %worker_id, "removing INCOMPLETE sentinel file");
278                    client
279                        .delete_object()
280                        .bucket(bucket)
281                        .key(incomplete_sentinel_key)
282                        .send()
283                        .await?;
284                    Ok::<(), anyhow::Error>(())
285                })
286                .await?;
287            }
288            Ok::<u64, anyhow::Error>(row_count)
289        };
290
291        worker_callback(fallible_logic.await.map_err(|e| e.to_string_with_causes()));
292    });
293
294    button.press_on_drop()
295}
296
297/// Renders the `upload operator`, which waits on the `start_stream` to ensure
298/// initialization is complete and then handles the uploads to S3.
299/// Returns a `completion_stream` which contains 1 event per worker of
300/// the result of the upload operation, either an error or the number of rows
301/// uploaded by the worker.
302///
303/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
304/// modulo the number of batches.
305fn render_upload_operator<'scope, T>(
306    scope: Scope<'scope, Timestamp>,
307    connection_context: ConnectionContext,
308    aws_connection: AwsConnection,
309    connection_id: CatalogItemId,
310    connection_details: S3UploadInfo,
311    sink_id: GlobalId,
312    input_collection: StreamVec<
313        'scope,
314        Timestamp,
315        Vec<ColumnationStack<((Row, ()), Timestamp, Diff)>>,
316    >,
317    up_to: Antichain<Timestamp>,
318    start_stream: StreamVec<'scope, Timestamp, Result<(), String>>,
319    params: CopyToParameters,
320    output_batch_count: u64,
321    enforce_external_addresses: bool,
322) -> (
323    StreamVec<'scope, Timestamp, Result<u64, String>>,
324    PressOnDropButton,
325)
326where
327    T: CopyToS3Uploader,
328{
329    let worker_id = scope.index();
330    let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone());
331
332    let mut input_handle = builder.new_disconnected_input(input_collection, Pipeline);
333    let (completion_handle, completion_stream) =
334        builder.new_output::<CapacityContainerBuilder<_>>();
335    let mut start_handle = builder.new_input_for(start_stream, Pipeline, &completion_handle);
336
337    let button = builder.build(move |caps| async move {
338        let [completion_cap] = caps.try_into().unwrap();
339
340        // Drain any events in the start stream. Once this stream advances to the empty frontier we
341        // can proceed with uploading. If we see an error in this stream, forward it to the completion
342        // stream and early-exit.
343        while let Some(event) = start_handle.next().await {
344            match event {
345                AsyncEvent::Data(cap, data) => {
346                    for res in data {
347                        if res.is_err() {
348                            completion_handle.give(&cap, res.map(|_| 0));
349                            return;
350                        }
351                    }
352                }
353                AsyncEvent::Progress(_) => {}
354            }
355        }
356
357        // fallible async block to use the `?` operator for convenience
358        let res = async move {
359            let sdk_config = aws_connection
360                .load_sdk_config(
361                    &connection_context,
362                    connection_id,
363                    InTask::Yes,
364                    enforce_external_addresses,
365                )
366                .await?;
367
368            // Map of an uploader per batch.
369            let mut s3_uploaders: BTreeMap<u64, T> = BTreeMap::new();
370
371            // As a special case, the 0th worker always forces a file to be
372            // created for batch 0, even if it never sees any data for batch 0.
373            // This ensures that we always write at least one file to S3, even
374            // if the input is empty. See database-issue#8599.
375            if worker_id == 0 {
376                let mut uploader = T::new(
377                    sdk_config.clone(),
378                    connection_details.clone(),
379                    &sink_id,
380                    0,
381                    params.clone(),
382                )?;
383                uploader.force_new_file().await?;
384                s3_uploaders.insert(0, uploader);
385            }
386
387            let mut row_count = 0;
388            while let Some(event) = input_handle.next().await {
389                match event {
390                    AsyncEvent::Data(_ts, data) => {
391                        for ((row, ()), ts, diff) in
392                            data.iter().flatten().flat_map(|chunk| chunk.iter())
393                        {
394                            // We're consuming a batch of data, and the upstream operator has to ensure
395                            // that the data is exchanged according to the batch.
396                            let batch = row.hashed() % output_batch_count;
397                            if !up_to.less_equal(ts) {
398                                if diff.is_negative() {
399                                    tracing::error!(
400                                        %sink_id, %diff, ?row,
401                                        "S3 oneshot sink encountered negative multiplicities",
402                                    );
403                                    anyhow::bail!(
404                                        "Invalid data in source, \
405                                         saw retractions ({}) for row that does not exist: {:?}",
406                                        -*diff,
407                                        row,
408                                    )
409                                }
410                                row_count += u64::try_from(diff.into_inner()).unwrap();
411                                let uploader = match s3_uploaders.entry(batch) {
412                                    Entry::Occupied(entry) => entry.into_mut(),
413                                    Entry::Vacant(entry) => {
414                                        debug!(%sink_id, %worker_id, "handling batch: {}", batch);
415                                        entry.insert(T::new(
416                                            sdk_config.clone(),
417                                            connection_details.clone(),
418                                            &sink_id,
419                                            batch,
420                                            params.clone(),
421                                        )?)
422                                    }
423                                };
424                                for _ in 0..diff.into_inner() {
425                                    uploader.append_row(row).await?;
426                                }
427                            }
428                        }
429                    }
430                    AsyncEvent::Progress(frontier) => {
431                        if PartialOrder::less_equal(&up_to, &frontier) {
432                            for uploader in s3_uploaders.values_mut() {
433                                uploader.finish().await?;
434                            }
435                            // We are done, send the final count.
436                            return Ok(row_count);
437                        }
438                    }
439                }
440            }
441            Ok::<u64, anyhow::Error>(row_count)
442        }
443        .await;
444
445        completion_handle.give(&completion_cap, res.map_err(|e| e.to_string_with_causes()));
446    });
447
448    (completion_stream, button.press_on_drop())
449}
450
451/// dyncfg parameters for the copy_to operator, stored in this struct to avoid
452/// requiring storage to depend on the compute crate. See `src/compute-types/src/dyncfgs.rs`
453/// for the definition of these parameters.
454#[derive(Clone, Debug)]
455pub struct CopyToParameters {
456    // The ratio (defined as a percentage) of row-group size to max-file-size.
457    // See the `parquet` module for more details on how this is used.
458    pub parquet_row_group_ratio: usize,
459    // The ratio (defined as a percentage) of arrow-builder size to row-group size.
460    // See the `parquet` module for more details on how this is used.
461    pub arrow_builder_buffer_ratio: usize,
462    // The size of each part in the multi-part upload to use when uploading files to S3.
463    pub s3_multipart_part_size_bytes: usize,
464}
465
466/// This trait is used to abstract over the upload details for different file formats.
467/// Each format has its own buffering semantics and upload logic, since some can be
468/// written in a streaming fashion row-by-row, whereas others use a columnar-based
469/// format that requires buffering a batch of rows before writing to S3.
470trait CopyToS3Uploader: Sized {
471    fn new(
472        sdk_config: SdkConfig,
473        connection_details: S3UploadInfo,
474        sink_id: &GlobalId,
475        batch: u64,
476        params: CopyToParameters,
477    ) -> Result<Self, anyhow::Error>;
478    /// Force the start of a new file, even if no rows have yet been appended or
479    /// if the current file has not yet reached the configured `max_file_size`.
480    async fn force_new_file(&mut self) -> Result<(), anyhow::Error>;
481    /// Append a row to the internal buffer, and optionally flush the buffer to S3.
482    async fn append_row(&mut self, row: &Row) -> Result<(), anyhow::Error>;
483    /// Flush the full remaining internal buffer to S3, and close all open resources.
484    /// This will be called when the input stream is finished.
485    async fn finish(&mut self) -> Result<(), anyhow::Error>;
486}