mz_adapter/coord/sequencer/inner/
copy_from.rs1use std::str::FromStr;
11
12use mz_adapter_types::connection::ConnectionId;
13use mz_ore::cast::CastInto;
14use mz_persist_client::batch::ProtoBatch;
15use mz_pgcopy::CopyFormatParams;
16use mz_repr::{CatalogItemId, Datum, NotNullViolation, RowArena};
17use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
18use mz_sql::session::metadata::SessionMetadata;
19use mz_storage_client::client::TableData;
20use mz_storage_types::oneshot_sources::{ContentShape, OneshotIngestionRequest};
21use smallvec::SmallVec;
22use url::Url;
23use uuid::Uuid;
24
25use crate::coord::sequencer::inner::return_if_err;
26use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
27use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
28use crate::session::{TransactionOps, WriteOp};
29use crate::{AdapterError, ExecuteContext, ExecuteResponse};
30
31impl Coordinator {
32 pub(crate) async fn sequence_copy_from(
33 &mut self,
34 ctx: ExecuteContext,
35 plan: plan::CopyFromPlan,
36 target_cluster: TargetCluster,
37 ) {
38 let plan::CopyFromPlan {
39 target_name: _,
40 target_id,
41 source,
42 columns: _,
43 source_desc,
44 mfp,
45 params,
46 filter,
47 } = plan;
48
49 let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
50 let style = ExprPrepOneShot {
51 logical_time: EvalTime::NotAvailable,
52 session: ctx.session(),
53 catalog_state: self.catalog().state(),
54 };
55 let mut from = from.lower_uncorrelated(self.catalog().state().system_config())?;
56 style.prep_scalar_expr(&mut from)?;
57
58 let temp_storage = RowArena::new();
61 let eval_result = from.eval(&[], &temp_storage)?;
62 let eval_string = match eval_result {
63 Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
64 Datum::String(url_str) => url_str,
65 other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
66 };
67
68 Ok(eval_string.to_string())
69 };
70
71 let Some(dest_table) = self.catalog().get_entry(&target_id).table() else {
73 let typ = self.catalog().get_entry(&target_id).item().typ();
74 let msg = format!("programming error: expected a Table found {typ:?}");
75 return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
76 };
77
78 let ingestion_id = Uuid::new_v4();
80 let collection_id = dest_table.global_id_writes();
81
82 let format = match params {
83 CopyFormatParams::Csv(csv) => {
84 mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
85 }
86 CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
87 CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
88 mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
89 ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL/S3 format")));
90 return;
91 }
92 };
93
94 let source = match source {
95 CopyFromSource::Url(from_expr) => {
96 let url = return_if_err!(eval_uri(from_expr), ctx);
97 let result = Url::parse(&url)
99 .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
100 let url = return_if_err!(result, ctx);
101
102 mz_storage_types::oneshot_sources::ContentSource::Http { url }
103 }
104 CopyFromSource::AwsS3 {
105 uri,
106 connection,
107 connection_id,
108 } => {
109 let uri = return_if_err!(eval_uri(uri), ctx);
110
111 let result = http::Uri::from_str(&uri)
116 .map_err(|err| {
117 AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
118 })
119 .and_then(|uri| {
120 if uri.scheme_str() != Some("s3") {
121 coord_bail!("only 's3://...' urls are supported as COPY FROM target");
122 }
123 Ok(uri)
124 })
125 .and_then(|uri| {
126 if uri.host().is_none() {
127 coord_bail!("missing bucket name from 's3://...' url");
128 }
129 Ok(uri)
130 });
131 let uri = return_if_err!(result, ctx);
132
133 mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
134 connection,
135 connection_id,
136 uri: uri.to_string(),
137 }
138 }
139 CopyFromSource::Stdin => {
140 unreachable!("COPY FROM STDIN should be handled elsewhere")
141 }
142 };
143
144 let filter = match filter {
145 None => mz_storage_types::oneshot_sources::ContentFilter::None,
146 Some(CopyFromFilter::Files(files)) => {
147 mz_storage_types::oneshot_sources::ContentFilter::Files(files)
148 }
149 Some(CopyFromFilter::Pattern(pattern)) => {
150 mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
151 }
152 };
153
154 let source_mfp = mfp
155 .into_plan()
156 .map_err(|s| AdapterError::internal("copy_from", s))
157 .and_then(|mfp| {
158 mfp.into_nontemporal().map_err(|_| {
159 AdapterError::internal("copy_from", "temporal MFP not allowed in copy from")
160 })
161 });
162 let source_mfp = return_if_err!(source_mfp, ctx);
163
164 let target_desc = dest_table.desc.latest();
166 for (col_idx, col_type) in target_desc.iter_types().enumerate() {
167 if !col_type.nullable {
168 if let Some(&projection_idx) = source_mfp.projection.get(col_idx) {
170 let input_arity = source_mfp.input_arity;
172 if projection_idx >= input_arity {
173 let expr_idx = projection_idx - input_arity;
174 if let Some(expr) = source_mfp.expressions.get(expr_idx) {
175 if matches!(
178 expr,
179 mz_expr::MirScalarExpr::Literal(Ok(row), _)
180 if row.iter().next().map(|d| d.is_null()).unwrap_or(false)
181 ) {
182 let col_name = target_desc.get_name(col_idx);
183 return ctx.retire(Err(AdapterError::ConstraintViolation(
184 NotNullViolation(col_name.clone()),
185 )));
186 }
187 }
188 }
189 } else {
190 let col_name = target_desc.get_name(col_idx);
192 return ctx.retire(Err(AdapterError::ConstraintViolation(NotNullViolation(
193 col_name.clone(),
194 ))));
195 }
196 }
197 }
198
199 let shape = ContentShape {
200 source_desc,
201 source_mfp,
202 };
203
204 let request = OneshotIngestionRequest {
205 source,
206 format,
207 filter,
208 shape,
209 };
210
211 let target_cluster = match self
212 .catalog()
213 .resolve_target_cluster(target_cluster, ctx.session())
214 {
215 Ok(cluster) => cluster,
216 Err(err) => {
217 return ctx.retire(Err(err));
218 }
219 };
220 let cluster_id = target_cluster.id;
221
222 let command_tx = self.internal_cmd_tx.clone();
225 let conn_id = ctx.session().conn_id().clone();
226 let closure = Box::new(move |batches| {
227 let _ = command_tx.send(crate::coord::Message::StagedBatches {
228 conn_id,
229 table_id: target_id,
230 batches,
231 });
232 });
233 let conn_id = ctx.session().conn_id().clone();
235 self.active_copies.insert(
236 conn_id,
237 ActiveCopyFrom {
238 ingestion_id,
239 cluster_id,
240 table_id: target_id,
241 ctx,
242 },
243 );
244
245 let _result = self
246 .controller
247 .storage
248 .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
249 .await;
250 }
251
252 pub(crate) fn commit_staged_batches(
253 &mut self,
254 conn_id: ConnectionId,
255 table_id: CatalogItemId,
256 batches: Vec<Result<ProtoBatch, String>>,
257 ) {
258 let Some(active_copy) = self.active_copies.remove(&conn_id) else {
259 tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
261 return;
262 };
263
264 let ActiveCopyFrom {
265 ingestion_id,
266 cluster_id: _,
267 table_id: _,
268 mut ctx,
269 } = active_copy;
270 tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
271
272 let mut all_batches = SmallVec::with_capacity(batches.len());
273 let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
274 let mut row_count = 0u64;
275
276 for maybe_batch in batches {
277 match maybe_batch {
278 Ok(batch) => {
279 let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
280 all_batches.push(batch);
281 row_count = row_count.saturating_add(count);
282 }
283 Err(err) => all_errors.push(err),
284 }
285 }
286
287 if let Some(error) = all_errors.pop() {
289 tracing::warn!(?error, ?all_errors, "failed COPY FROM");
290
291 ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
295 "COPY FROM: {error}"
296 ))));
297
298 return;
299 }
300
301 let stage_write = ctx
306 .session_mut()
307 .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
308 id: table_id,
309 rows: TableData::Batches(all_batches),
310 }]));
311
312 if let Err(err) = stage_write {
313 ctx.retire(Err(err));
314 } else {
315 ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
316 }
317 }
318
319 #[mz_ore::instrument(level = "debug")]
321 pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
322 if let Some(ActiveCopyFrom {
323 ingestion_id,
324 cluster_id: _,
325 table_id: _,
326 ctx,
327 }) = self.active_copies.remove(conn_id)
328 {
329 let cancel_result = self
330 .controller
331 .storage
332 .cancel_oneshot_ingestion(ingestion_id);
333 if let Err(err) = cancel_result {
334 tracing::error!(?err, "failed to cancel OneshotIngestion");
335 }
336
337 ctx.retire(Err(AdapterError::Canceled));
338 }
339 }
340}