mz_adapter/coord/sequencer/inner/
copy_from.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::str::FromStr;
11
12use mz_adapter_types::connection::ConnectionId;
13use mz_ore::cast::CastInto;
14use mz_persist_client::batch::ProtoBatch;
15use mz_pgcopy::CopyFormatParams;
16use mz_repr::{CatalogItemId, Datum, RowArena};
17use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
18use mz_sql::session::metadata::SessionMetadata;
19use mz_storage_client::client::TableData;
20use mz_storage_types::oneshot_sources::{ContentShape, OneshotIngestionRequest};
21use smallvec::SmallVec;
22use url::Url;
23use uuid::Uuid;
24
25use crate::coord::sequencer::inner::return_if_err;
26use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
27use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
28use crate::session::{TransactionOps, WriteOp};
29use crate::{AdapterError, ExecuteContext, ExecuteResponse};
30
31impl Coordinator {
32    pub(crate) async fn sequence_copy_from(
33        &mut self,
34        ctx: ExecuteContext,
35        plan: plan::CopyFromPlan,
36        target_cluster: TargetCluster,
37    ) {
38        let plan::CopyFromPlan {
39            id,
40            source,
41            columns: _,
42            source_desc,
43            mfp,
44            params,
45            filter,
46        } = plan;
47
48        let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
49            let style = ExprPrepStyle::OneShot {
50                logical_time: EvalTime::NotAvailable,
51                session: ctx.session(),
52                catalog_state: self.catalog().state(),
53            };
54            let mut from = from.lower_uncorrelated()?;
55            prep_scalar_expr(&mut from, style)?;
56
57            // TODO(cf3): Add structured errors for the below uses of `coord_bail!`
58            // and AdapterError::Unstructured.
59            let temp_storage = RowArena::new();
60            let eval_result = from.eval(&[], &temp_storage)?;
61            let eval_string = match eval_result {
62                Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
63                Datum::String(url_str) => url_str,
64                other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
65            };
66
67            Ok(eval_string.to_string())
68        };
69
70        // We check in planning that we're copying into a Table, but be defensive.
71        let Some(dest_table) = self.catalog().get_entry(&id).table() else {
72            let typ = self.catalog().get_entry(&id).item().typ();
73            let msg = format!("programming error: expected a Table found {typ:?}");
74            return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
75        };
76
77        // Generate a unique UUID for our ingestion.
78        let ingestion_id = Uuid::new_v4();
79        let collection_id = dest_table.global_id_writes();
80
81        let format = match params {
82            CopyFormatParams::Csv(csv) => {
83                mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
84            }
85            CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
86            CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
87                mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
88                ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL format")));
89                return;
90            }
91        };
92
93        let source = match source {
94            CopyFromSource::Url(from_expr) => {
95                let url = return_if_err!(eval_uri(from_expr), ctx);
96                // TODO(cf2): Structured errors.
97                let result = Url::parse(&url)
98                    .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
99                let url = return_if_err!(result, ctx);
100
101                mz_storage_types::oneshot_sources::ContentSource::Http { url }
102            }
103            CopyFromSource::AwsS3 {
104                uri,
105                connection,
106                connection_id,
107            } => {
108                let uri = return_if_err!(eval_uri(uri), ctx);
109
110                // Validate the URI is an S3 URI, with a bucket name. We rely on validating here
111                // and expect it in clusterd.
112                //
113                // TODO(cf2): Structured errors.
114                let result = http::Uri::from_str(&uri)
115                    .map_err(|err| {
116                        AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
117                    })
118                    .and_then(|uri| {
119                        if uri.scheme_str() != Some("s3") {
120                            coord_bail!("only 's3://...' urls are supported as COPY FROM target");
121                        }
122                        Ok(uri)
123                    })
124                    .and_then(|uri| {
125                        if uri.host().is_none() {
126                            coord_bail!("missing bucket name from 's3://...' url");
127                        }
128                        Ok(uri)
129                    });
130                let uri = return_if_err!(result, ctx);
131
132                mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
133                    connection,
134                    connection_id,
135                    uri: uri.to_string(),
136                }
137            }
138            CopyFromSource::Stdin => {
139                unreachable!("COPY FROM STDIN should be handled elsewhere")
140            }
141        };
142
143        let filter = match filter {
144            None => mz_storage_types::oneshot_sources::ContentFilter::None,
145            Some(CopyFromFilter::Files(files)) => {
146                mz_storage_types::oneshot_sources::ContentFilter::Files(files)
147            }
148            Some(CopyFromFilter::Pattern(pattern)) => {
149                mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
150            }
151        };
152
153        let source_mfp = mfp
154            .into_plan()
155            .map_err(|s| AdapterError::internal("copy_from", s))
156            .and_then(|mfp| {
157                mfp.into_nontemporal().map_err(|_| {
158                    AdapterError::internal("copy_from", "temporal MFP not allowed in copy from")
159                })
160            });
161        let source_mfp = return_if_err!(source_mfp, ctx);
162        let shape = ContentShape {
163            source_desc,
164            source_mfp,
165        };
166
167        let request = OneshotIngestionRequest {
168            source,
169            format,
170            filter,
171            shape,
172        };
173
174        let target_cluster = match self
175            .catalog()
176            .resolve_target_cluster(target_cluster, ctx.session())
177        {
178            Ok(cluster) => cluster,
179            Err(err) => {
180                return ctx.retire(Err(err));
181            }
182        };
183        let cluster_id = target_cluster.id;
184
185        // When we finish staging the Batches in Persist, we'll send a command
186        // to the Coordinator.
187        let command_tx = self.internal_cmd_tx.clone();
188        let conn_id = ctx.session().conn_id().clone();
189        let closure = Box::new(move |batches| {
190            let _ = command_tx.send(crate::coord::Message::StagedBatches {
191                conn_id,
192                table_id: id,
193                batches,
194            });
195        });
196        // Stash the execute context so we can cancel the COPY.
197        let conn_id = ctx.session().conn_id().clone();
198        self.active_copies.insert(
199            conn_id,
200            ActiveCopyFrom {
201                ingestion_id,
202                cluster_id,
203                table_id: id,
204                ctx,
205            },
206        );
207
208        let _result = self
209            .controller
210            .storage
211            .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
212            .await;
213    }
214
215    pub(crate) fn commit_staged_batches(
216        &mut self,
217        conn_id: ConnectionId,
218        table_id: CatalogItemId,
219        batches: Vec<Result<ProtoBatch, String>>,
220    ) {
221        let Some(active_copy) = self.active_copies.remove(&conn_id) else {
222            // Getting a successful response for a cancel COPY FROM is unexpected.
223            tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
224            return;
225        };
226
227        let ActiveCopyFrom {
228            ingestion_id,
229            cluster_id: _,
230            table_id: _,
231            mut ctx,
232        } = active_copy;
233        tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
234
235        let mut all_batches = SmallVec::with_capacity(batches.len());
236        let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
237        let mut row_count = 0u64;
238
239        for maybe_batch in batches {
240            match maybe_batch {
241                Ok(batch) => {
242                    let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
243                    all_batches.push(batch);
244                    row_count = row_count.saturating_add(count);
245                }
246                Err(err) => all_errors.push(err),
247            }
248        }
249
250        // If we got any errors we need to fail the whole operation.
251        if let Some(error) = all_errors.pop() {
252            tracing::warn!(?error, ?all_errors, "failed COPY FROM");
253
254            // TODO(cf1): Cleanup the existing ProtoBatches to prevent leaking them.
255            // TODO(cf2): Carry structured errors all the way through.
256
257            ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
258                "COPY FROM: {error}"
259            ))));
260
261            return;
262        }
263
264        // Stage a WriteOp, then when the Session is retired we complete the
265        // transaction, which handles acquiring the write lock for `table_id`,
266        // advancing the timestamps of the staged batches, and waiting for
267        // everything to complete before sending a response to the client.
268        let stage_write = ctx
269            .session_mut()
270            .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
271                id: table_id,
272                rows: TableData::Batches(all_batches),
273            }]));
274
275        if let Err(err) = stage_write {
276            ctx.retire(Err(err));
277        } else {
278            ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
279        }
280    }
281
282    /// Cancel any active `COPY FROM` statements/oneshot ingestions.
283    #[mz_ore::instrument(level = "debug")]
284    pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
285        if let Some(ActiveCopyFrom {
286            ingestion_id,
287            cluster_id: _,
288            table_id: _,
289            ctx,
290        }) = self.active_copies.remove(conn_id)
291        {
292            let cancel_result = self
293                .controller
294                .storage
295                .cancel_oneshot_ingestion(ingestion_id);
296            if let Err(err) = cancel_result {
297                tracing::error!(?err, "failed to cancel OneshotIngestion");
298            }
299
300            ctx.retire(Err(AdapterError::Canceled));
301        }
302    }
303}