Skip to main content

mz_adapter/coord/sequencer/inner/
copy_from.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::str::FromStr;
11
12use mz_adapter_types::connection::ConnectionId;
13use mz_ore::cast::CastInto;
14use mz_persist_client::batch::ProtoBatch;
15use mz_pgcopy::CopyFormatParams;
16use mz_repr::{CatalogItemId, Datum, NotNullViolation, RowArena};
17use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
18use mz_sql::session::metadata::SessionMetadata;
19use mz_storage_client::client::TableData;
20use mz_storage_types::oneshot_sources::{ContentShape, OneshotIngestionRequest};
21use smallvec::SmallVec;
22use url::Url;
23use uuid::Uuid;
24
25use crate::coord::sequencer::inner::return_if_err;
26use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
27use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
28use crate::session::{TransactionOps, WriteOp};
29use crate::{AdapterError, ExecuteContext, ExecuteResponse};
30
31impl Coordinator {
32    pub(crate) async fn sequence_copy_from(
33        &mut self,
34        ctx: ExecuteContext,
35        plan: plan::CopyFromPlan,
36        target_cluster: TargetCluster,
37    ) {
38        let plan::CopyFromPlan {
39            target_name: _,
40            target_id,
41            source,
42            columns: _,
43            source_desc,
44            mfp,
45            params,
46            filter,
47        } = plan;
48
49        let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
50            let style = ExprPrepOneShot {
51                logical_time: EvalTime::NotAvailable,
52                session: ctx.session(),
53                catalog_state: self.catalog().state(),
54            };
55            let mut from = from.lower_uncorrelated(self.catalog().state().system_config())?;
56            style.prep_scalar_expr(&mut from)?;
57
58            // TODO(cf3): Add structured errors for the below uses of `coord_bail!`
59            // and AdapterError::Unstructured.
60            let temp_storage = RowArena::new();
61            let eval_result = from.eval(&[], &temp_storage)?;
62            let eval_string = match eval_result {
63                Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
64                Datum::String(url_str) => url_str,
65                other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
66            };
67
68            Ok(eval_string.to_string())
69        };
70
71        // We check in planning that we're copying into a Table, but be defensive.
72        let Some(dest_table) = self.catalog().get_entry(&target_id).table() else {
73            let typ = self.catalog().get_entry(&target_id).item().typ();
74            let msg = format!("programming error: expected a Table found {typ:?}");
75            return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
76        };
77
78        // Generate a unique UUID for our ingestion.
79        let ingestion_id = Uuid::new_v4();
80        let collection_id = dest_table.global_id_writes();
81
82        let format = match params {
83            CopyFormatParams::Csv(csv) => {
84                mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
85            }
86            CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
87            CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
88                mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
89                ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL/S3 format")));
90                return;
91            }
92        };
93
94        let source = match source {
95            CopyFromSource::Url(from_expr) => {
96                let url = return_if_err!(eval_uri(from_expr), ctx);
97                // TODO(cf2): Structured errors.
98                let result = Url::parse(&url)
99                    .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
100                let url = return_if_err!(result, ctx);
101
102                mz_storage_types::oneshot_sources::ContentSource::Http { url }
103            }
104            CopyFromSource::AwsS3 {
105                uri,
106                connection,
107                connection_id,
108            } => {
109                let uri = return_if_err!(eval_uri(uri), ctx);
110
111                // Validate the URI is an S3 URI, with a bucket name. We rely on validating here
112                // and expect it in clusterd.
113                //
114                // TODO(cf2): Structured errors.
115                let result = http::Uri::from_str(&uri)
116                    .map_err(|err| {
117                        AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
118                    })
119                    .and_then(|uri| {
120                        if uri.scheme_str() != Some("s3") {
121                            coord_bail!("only 's3://...' urls are supported as COPY FROM target");
122                        }
123                        Ok(uri)
124                    })
125                    .and_then(|uri| {
126                        if uri.host().is_none() {
127                            coord_bail!("missing bucket name from 's3://...' url");
128                        }
129                        Ok(uri)
130                    });
131                let uri = return_if_err!(result, ctx);
132
133                mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
134                    connection,
135                    connection_id,
136                    uri: uri.to_string(),
137                }
138            }
139            CopyFromSource::Stdin => {
140                unreachable!("COPY FROM STDIN should be handled elsewhere")
141            }
142        };
143
144        let filter = match filter {
145            None => mz_storage_types::oneshot_sources::ContentFilter::None,
146            Some(CopyFromFilter::Files(files)) => {
147                mz_storage_types::oneshot_sources::ContentFilter::Files(files)
148            }
149            Some(CopyFromFilter::Pattern(pattern)) => {
150                mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
151            }
152        };
153
154        let source_mfp = mfp
155            .into_plan()
156            .map_err(|s| AdapterError::internal("copy_from", s))
157            .and_then(|mfp| {
158                mfp.into_nontemporal().map_err(|_| {
159                    AdapterError::internal("copy_from", "temporal MFP not allowed in copy from")
160                })
161            });
162        let source_mfp = return_if_err!(source_mfp, ctx);
163
164        // Validate that all non-nullable columns in the target table will be populated.
165        let target_desc = dest_table.desc.latest();
166        for (col_idx, col_type) in target_desc.iter_types().enumerate() {
167            if !col_type.nullable {
168                // Check what value the MFP will produce for this column position.
169                if let Some(&projection_idx) = source_mfp.projection.get(col_idx) {
170                    // If the projection index is beyond the input arity, it references an expression.
171                    let input_arity = source_mfp.input_arity;
172                    if projection_idx >= input_arity {
173                        let expr_idx = projection_idx - input_arity;
174                        if let Some(expr) = source_mfp.expressions.get(expr_idx) {
175                            // Check if the expression is a NULL literal.
176                            // A NULL literal is represented as Literal(Ok(empty_row), _)
177                            if matches!(
178                                expr,
179                                mz_expr::MirScalarExpr::Literal(Ok(row), _)
180                                    if row.iter().next().map(|d| d.is_null()).unwrap_or(false)
181                            ) {
182                                let col_name = target_desc.get_name(col_idx);
183                                return ctx.retire(Err(AdapterError::ConstraintViolation(
184                                    NotNullViolation(col_name.clone()),
185                                )));
186                            }
187                        }
188                    }
189                } else {
190                    // If there's no projection for this column, that's a validation error
191                    let col_name = target_desc.get_name(col_idx);
192                    return ctx.retire(Err(AdapterError::ConstraintViolation(NotNullViolation(
193                        col_name.clone(),
194                    ))));
195                }
196            }
197        }
198
199        let shape = ContentShape {
200            source_desc,
201            source_mfp,
202        };
203
204        let request = OneshotIngestionRequest {
205            source,
206            format,
207            filter,
208            shape,
209        };
210
211        let target_cluster = match self
212            .catalog()
213            .resolve_target_cluster(target_cluster, ctx.session())
214        {
215            Ok(cluster) => cluster,
216            Err(err) => {
217                return ctx.retire(Err(err));
218            }
219        };
220        let cluster_id = target_cluster.id;
221
222        // When we finish staging the Batches in Persist, we'll send a command
223        // to the Coordinator.
224        let command_tx = self.internal_cmd_tx.clone();
225        let conn_id = ctx.session().conn_id().clone();
226        let closure = Box::new(move |batches| {
227            let _ = command_tx.send(crate::coord::Message::StagedBatches {
228                conn_id,
229                table_id: target_id,
230                batches,
231            });
232        });
233        // Stash the execute context so we can cancel the COPY.
234        let conn_id = ctx.session().conn_id().clone();
235        self.active_copies.insert(
236            conn_id,
237            ActiveCopyFrom {
238                ingestion_id,
239                cluster_id,
240                table_id: target_id,
241                ctx,
242            },
243        );
244
245        let _result = self
246            .controller
247            .storage
248            .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
249            .await;
250    }
251
252    pub(crate) fn commit_staged_batches(
253        &mut self,
254        conn_id: ConnectionId,
255        table_id: CatalogItemId,
256        batches: Vec<Result<ProtoBatch, String>>,
257    ) {
258        let Some(active_copy) = self.active_copies.remove(&conn_id) else {
259            // Getting a successful response for a cancel COPY FROM is unexpected.
260            tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
261            return;
262        };
263
264        let ActiveCopyFrom {
265            ingestion_id,
266            cluster_id: _,
267            table_id: _,
268            mut ctx,
269        } = active_copy;
270        tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
271
272        let mut all_batches = SmallVec::with_capacity(batches.len());
273        let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
274        let mut row_count = 0u64;
275
276        for maybe_batch in batches {
277            match maybe_batch {
278                Ok(batch) => {
279                    let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
280                    all_batches.push(batch);
281                    row_count = row_count.saturating_add(count);
282                }
283                Err(err) => all_errors.push(err),
284            }
285        }
286
287        // If we got any errors we need to fail the whole operation.
288        if let Some(error) = all_errors.pop() {
289            tracing::warn!(?error, ?all_errors, "failed COPY FROM");
290
291            // TODO(cf1): Cleanup the existing ProtoBatches to prevent leaking them.
292            // TODO(cf2): Carry structured errors all the way through.
293
294            ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
295                "COPY FROM: {error}"
296            ))));
297
298            return;
299        }
300
301        // Stage a WriteOp, then when the Session is retired we complete the
302        // transaction, which handles acquiring the write lock for `table_id`,
303        // advancing the timestamps of the staged batches, and waiting for
304        // everything to complete before sending a response to the client.
305        let stage_write = ctx
306            .session_mut()
307            .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
308                id: table_id,
309                rows: TableData::Batches(all_batches),
310            }]));
311
312        if let Err(err) = stage_write {
313            ctx.retire(Err(err));
314        } else {
315            ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
316        }
317    }
318
319    /// Cancel any active `COPY FROM` statements/oneshot ingestions.
320    #[mz_ore::instrument(level = "debug")]
321    pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
322        if let Some(ActiveCopyFrom {
323            ingestion_id,
324            cluster_id: _,
325            table_id: _,
326            ctx,
327        }) = self.active_copies.remove(conn_id)
328        {
329            let cancel_result = self
330                .controller
331                .storage
332                .cancel_oneshot_ingestion(ingestion_id);
333            if let Err(err) = cancel_result {
334                tracing::error!(?err, "failed to cancel OneshotIngestion");
335            }
336
337            ctx.retire(Err(AdapterError::Canceled));
338        }
339    }
340}