mz_adapter/coord/sequencer/inner/
copy_from.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::str::FromStr;
11
12use mz_adapter_types::connection::ConnectionId;
13use mz_ore::cast::CastInto;
14use mz_persist_client::batch::ProtoBatch;
15use mz_pgcopy::CopyFormatParams;
16use mz_repr::{CatalogItemId, Datum, RowArena};
17use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
18use mz_sql::session::metadata::SessionMetadata;
19use mz_storage_client::client::TableData;
20use mz_storage_types::oneshot_sources::{ContentShape, OneshotIngestionRequest};
21use smallvec::SmallVec;
22use url::Url;
23use uuid::Uuid;
24
25use crate::coord::sequencer::inner::return_if_err;
26use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
27use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
28use crate::session::{TransactionOps, WriteOp};
29use crate::{AdapterError, ExecuteContext, ExecuteResponse};
30
31impl Coordinator {
32    pub(crate) async fn sequence_copy_from(
33        &mut self,
34        ctx: ExecuteContext,
35        plan: plan::CopyFromPlan,
36        target_cluster: TargetCluster,
37    ) {
38        let plan::CopyFromPlan {
39            target_name: _,
40            target_id,
41            source,
42            columns: _,
43            source_desc,
44            mfp,
45            params,
46            filter,
47        } = plan;
48
49        let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
50            let style = ExprPrepStyle::OneShot {
51                logical_time: EvalTime::NotAvailable,
52                session: ctx.session(),
53                catalog_state: self.catalog().state(),
54            };
55            let mut from = from.lower_uncorrelated()?;
56            prep_scalar_expr(&mut from, style)?;
57
58            // TODO(cf3): Add structured errors for the below uses of `coord_bail!`
59            // and AdapterError::Unstructured.
60            let temp_storage = RowArena::new();
61            let eval_result = from.eval(&[], &temp_storage)?;
62            let eval_string = match eval_result {
63                Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
64                Datum::String(url_str) => url_str,
65                other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
66            };
67
68            Ok(eval_string.to_string())
69        };
70
71        // We check in planning that we're copying into a Table, but be defensive.
72        let Some(dest_table) = self.catalog().get_entry(&target_id).table() else {
73            let typ = self.catalog().get_entry(&target_id).item().typ();
74            let msg = format!("programming error: expected a Table found {typ:?}");
75            return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
76        };
77
78        // Generate a unique UUID for our ingestion.
79        let ingestion_id = Uuid::new_v4();
80        let collection_id = dest_table.global_id_writes();
81
82        let format = match params {
83            CopyFormatParams::Csv(csv) => {
84                mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
85            }
86            CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
87            CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
88                mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
89                ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL format")));
90                return;
91            }
92        };
93
94        let source = match source {
95            CopyFromSource::Url(from_expr) => {
96                let url = return_if_err!(eval_uri(from_expr), ctx);
97                // TODO(cf2): Structured errors.
98                let result = Url::parse(&url)
99                    .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
100                let url = return_if_err!(result, ctx);
101
102                mz_storage_types::oneshot_sources::ContentSource::Http { url }
103            }
104            CopyFromSource::AwsS3 {
105                uri,
106                connection,
107                connection_id,
108            } => {
109                let uri = return_if_err!(eval_uri(uri), ctx);
110
111                // Validate the URI is an S3 URI, with a bucket name. We rely on validating here
112                // and expect it in clusterd.
113                //
114                // TODO(cf2): Structured errors.
115                let result = http::Uri::from_str(&uri)
116                    .map_err(|err| {
117                        AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
118                    })
119                    .and_then(|uri| {
120                        if uri.scheme_str() != Some("s3") {
121                            coord_bail!("only 's3://...' urls are supported as COPY FROM target");
122                        }
123                        Ok(uri)
124                    })
125                    .and_then(|uri| {
126                        if uri.host().is_none() {
127                            coord_bail!("missing bucket name from 's3://...' url");
128                        }
129                        Ok(uri)
130                    });
131                let uri = return_if_err!(result, ctx);
132
133                mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
134                    connection,
135                    connection_id,
136                    uri: uri.to_string(),
137                }
138            }
139            CopyFromSource::Stdin => {
140                unreachable!("COPY FROM STDIN should be handled elsewhere")
141            }
142        };
143
144        let filter = match filter {
145            None => mz_storage_types::oneshot_sources::ContentFilter::None,
146            Some(CopyFromFilter::Files(files)) => {
147                mz_storage_types::oneshot_sources::ContentFilter::Files(files)
148            }
149            Some(CopyFromFilter::Pattern(pattern)) => {
150                mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
151            }
152        };
153
154        let source_mfp = mfp
155            .into_plan()
156            .map_err(|s| AdapterError::internal("copy_from", s))
157            .and_then(|mfp| {
158                mfp.into_nontemporal().map_err(|_| {
159                    AdapterError::internal("copy_from", "temporal MFP not allowed in copy from")
160                })
161            });
162        let source_mfp = return_if_err!(source_mfp, ctx);
163        let shape = ContentShape {
164            source_desc,
165            source_mfp,
166        };
167
168        let request = OneshotIngestionRequest {
169            source,
170            format,
171            filter,
172            shape,
173        };
174
175        let target_cluster = match self
176            .catalog()
177            .resolve_target_cluster(target_cluster, ctx.session())
178        {
179            Ok(cluster) => cluster,
180            Err(err) => {
181                return ctx.retire(Err(err));
182            }
183        };
184        let cluster_id = target_cluster.id;
185
186        // When we finish staging the Batches in Persist, we'll send a command
187        // to the Coordinator.
188        let command_tx = self.internal_cmd_tx.clone();
189        let conn_id = ctx.session().conn_id().clone();
190        let closure = Box::new(move |batches| {
191            let _ = command_tx.send(crate::coord::Message::StagedBatches {
192                conn_id,
193                table_id: target_id,
194                batches,
195            });
196        });
197        // Stash the execute context so we can cancel the COPY.
198        let conn_id = ctx.session().conn_id().clone();
199        self.active_copies.insert(
200            conn_id,
201            ActiveCopyFrom {
202                ingestion_id,
203                cluster_id,
204                table_id: target_id,
205                ctx,
206            },
207        );
208
209        let _result = self
210            .controller
211            .storage
212            .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
213            .await;
214    }
215
216    pub(crate) fn commit_staged_batches(
217        &mut self,
218        conn_id: ConnectionId,
219        table_id: CatalogItemId,
220        batches: Vec<Result<ProtoBatch, String>>,
221    ) {
222        let Some(active_copy) = self.active_copies.remove(&conn_id) else {
223            // Getting a successful response for a cancel COPY FROM is unexpected.
224            tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
225            return;
226        };
227
228        let ActiveCopyFrom {
229            ingestion_id,
230            cluster_id: _,
231            table_id: _,
232            mut ctx,
233        } = active_copy;
234        tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
235
236        let mut all_batches = SmallVec::with_capacity(batches.len());
237        let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
238        let mut row_count = 0u64;
239
240        for maybe_batch in batches {
241            match maybe_batch {
242                Ok(batch) => {
243                    let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
244                    all_batches.push(batch);
245                    row_count = row_count.saturating_add(count);
246                }
247                Err(err) => all_errors.push(err),
248            }
249        }
250
251        // If we got any errors we need to fail the whole operation.
252        if let Some(error) = all_errors.pop() {
253            tracing::warn!(?error, ?all_errors, "failed COPY FROM");
254
255            // TODO(cf1): Cleanup the existing ProtoBatches to prevent leaking them.
256            // TODO(cf2): Carry structured errors all the way through.
257
258            ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
259                "COPY FROM: {error}"
260            ))));
261
262            return;
263        }
264
265        // Stage a WriteOp, then when the Session is retired we complete the
266        // transaction, which handles acquiring the write lock for `table_id`,
267        // advancing the timestamps of the staged batches, and waiting for
268        // everything to complete before sending a response to the client.
269        let stage_write = ctx
270            .session_mut()
271            .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
272                id: table_id,
273                rows: TableData::Batches(all_batches),
274            }]));
275
276        if let Err(err) = stage_write {
277            ctx.retire(Err(err));
278        } else {
279            ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
280        }
281    }
282
283    /// Cancel any active `COPY FROM` statements/oneshot ingestions.
284    #[mz_ore::instrument(level = "debug")]
285    pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
286        if let Some(ActiveCopyFrom {
287            ingestion_id,
288            cluster_id: _,
289            table_id: _,
290            ctx,
291        }) = self.active_copies.remove(conn_id)
292        {
293            let cancel_result = self
294                .controller
295                .storage
296                .cancel_oneshot_ingestion(ingestion_id);
297            if let Err(err) = cancel_result {
298                tracing::error!(?err, "failed to cancel OneshotIngestion");
299            }
300
301            ctx.retire(Err(AdapterError::Canceled));
302        }
303    }
304}