1use std::str::FromStr;
11use std::sync::Arc;
12
13use mz_adapter_types::connection::ConnectionId;
14use mz_ore::cast::CastInto;
15use mz_persist_client::Diagnostics;
16use mz_persist_client::batch::ProtoBatch;
17use mz_persist_types::codec_impls::UnitSchema;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::{CatalogItemId, ColumnIndex, Datum, RelationDesc, Row, RowArena};
20use mz_sql::catalog::SessionCatalog;
21use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
22use mz_sql::session::metadata::SessionMetadata;
23use mz_storage_client::client::TableData;
24use mz_storage_types::StorageDiff;
25use mz_storage_types::oneshot_sources::{ContentShape, OneshotIngestionRequest};
26use mz_storage_types::sources::SourceData;
27use smallvec::SmallVec;
28use timely::progress::Antichain;
29use tokio::sync::{mpsc, oneshot};
30use url::Url;
31use uuid::Uuid;
32
33use crate::command::CopyFromStdinWriter;
34use crate::coord::sequencer::inner::return_if_err;
35use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
36use crate::optimize;
37use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
38use crate::session::{Session, TransactionOps, WriteOp};
39use crate::{AdapterError, ExecuteContext, ExecuteResponse};
40
41const COPY_FROM_STDIN_MAX_BATCH_BYTES: usize = 32 * 1024 * 1024;
44
45impl Coordinator {
46 pub(crate) async fn sequence_copy_from(
47 &mut self,
48 ctx: ExecuteContext,
49 plan: plan::CopyFromPlan,
50 target_cluster: TargetCluster,
51 ) {
52 let plan::CopyFromPlan {
53 target_name: _,
54 target_id,
55 source,
56 columns: _,
57 source_desc,
58 mfp,
59 params,
60 filter,
61 } = plan;
62
63 let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
64 let style = ExprPrepOneShot {
65 logical_time: EvalTime::NotAvailable,
66 session: ctx.session(),
67 catalog_state: self.catalog().state(),
68 };
69 let mut from = from.lower_uncorrelated(self.catalog().state().system_config())?;
70 style.prep_scalar_expr(&mut from)?;
71
72 let temp_storage = RowArena::new();
75 let eval_result = from.eval(&[], &temp_storage)?;
76 let eval_string = match eval_result {
77 Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
78 Datum::String(url_str) => url_str,
79 other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
80 };
81
82 Ok(eval_string.to_string())
83 };
84
85 let Some(entry) = self.catalog().try_get_entry(&target_id) else {
87 return ctx.retire(Err(AdapterError::ConcurrentDependencyDrop {
88 dependency_kind: "table",
89 dependency_id: target_id.to_string(),
90 }));
91 };
92 let Some(dest_table) = entry.table() else {
93 let typ = entry.item().typ();
94 let msg = format!("programming error: expected a Table found {typ:?}");
95 return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
96 };
97
98 let ingestion_id = Uuid::new_v4();
100 let collection_id = dest_table.global_id_writes();
101
102 let format = match params {
103 CopyFormatParams::Csv(csv) => {
104 mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
105 }
106 CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
107 CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
108 mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
109 ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL/S3 format")));
110 return;
111 }
112 };
113
114 let source = match source {
115 CopyFromSource::Url(from_expr) => {
116 let url = return_if_err!(eval_uri(from_expr), ctx);
117 let result = Url::parse(&url)
119 .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
120 let url = return_if_err!(result, ctx);
121
122 match url.scheme() {
129 "http" | "https" => {}
130 other => {
131 return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
132 "only 'http://' and 'https://' urls are supported as COPY FROM \
133 target, got '{other}://'"
134 ))));
135 }
136 }
137 let enforce_external_addresses =
138 mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
139 .get(self.controller.storage.config().config_set());
140 if enforce_external_addresses {
141 if let Err(err) = mz_ore::netio::ensure_url_ip_global(&url) {
142 return ctx
143 .retire(Err(AdapterError::Unstructured(anyhow::anyhow!("{err}"))));
144 }
145 }
146 mz_storage_types::oneshot_sources::ContentSource::Http { url }
147 }
148 CopyFromSource::AwsS3 {
149 uri,
150 connection,
151 connection_id,
152 } => {
153 let uri = return_if_err!(eval_uri(uri), ctx);
154
155 let result = http::Uri::from_str(&uri)
160 .map_err(|err| {
161 AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
162 })
163 .and_then(|uri| {
164 if uri.scheme_str() != Some("s3") && uri.scheme_str() != Some("gs") {
165 coord_bail!("only 's3://...' and 'gs://...' urls are supported as COPY FROM target");
166 }
167 Ok(uri)
168 })
169 .and_then(|uri| {
170 if uri.host().is_none() {
171 coord_bail!("missing bucket name from 's3://...' url");
172 }
173 Ok(uri)
174 });
175 let uri = return_if_err!(result, ctx);
176
177 mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
178 connection,
179 connection_id,
180 uri: uri.to_string(),
181 }
182 }
183 CopyFromSource::Stdin => {
184 unreachable!("COPY FROM STDIN should be handled elsewhere")
185 }
186 };
187
188 let filter = match filter {
189 None => mz_storage_types::oneshot_sources::ContentFilter::None,
190 Some(CopyFromFilter::Files(files)) => {
191 mz_storage_types::oneshot_sources::ContentFilter::Files(files)
192 }
193 Some(CopyFromFilter::Pattern(pattern)) => {
194 mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
195 }
196 };
197
198 let source_mfp = mfp
199 .into_plan()
200 .map_err(|s| AdapterError::internal("copy_from", s))
201 .and_then(|mfp| {
202 mfp.into_nontemporal().map_err(|_| {
203 AdapterError::internal("copy_from", "temporal MFP not allowed in copy from")
204 })
205 });
206 let source_mfp = return_if_err!(source_mfp, ctx);
207
208 let shape = ContentShape {
209 source_desc,
210 source_mfp,
211 };
212
213 let request = OneshotIngestionRequest {
214 source,
215 format,
216 filter,
217 shape,
218 };
219
220 let target_cluster = match self
221 .catalog()
222 .resolve_target_cluster(target_cluster, ctx.session())
223 {
224 Ok(cluster) => cluster,
225 Err(err) => {
226 return ctx.retire(Err(err));
227 }
228 };
229 let cluster_id = target_cluster.id;
230
231 let command_tx = self.internal_cmd_tx.clone();
234 let conn_id = ctx.session().conn_id().clone();
235 let closure = Box::new(move |batches| {
236 let _ = command_tx.send(crate::coord::Message::StagedBatches {
237 conn_id,
238 table_id: target_id,
239 batches,
240 });
241 });
242 let conn_id = ctx.session().conn_id().clone();
244 self.active_copies.insert(
245 conn_id,
246 ActiveCopyFrom {
247 ingestion_id,
248 cluster_id,
249 table_id: target_id,
250 ctx,
251 },
252 );
253
254 let _result = self
255 .controller
256 .storage
257 .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
258 .await;
259 }
260
261 pub(crate) fn setup_copy_from_stdin(
268 &self,
269 session: &Session,
270 target_id: CatalogItemId,
271 target_name: String,
272 columns: Vec<ColumnIndex>,
273 row_desc: RelationDesc,
274 params: CopyFormatParams<'static>,
275 ) -> Result<CopyFromStdinWriter, AdapterError> {
276 let Some(entry) = self.catalog().try_get_entry(&target_id) else {
278 return Err(AdapterError::ConcurrentDependencyDrop {
279 dependency_kind: "table",
280 dependency_id: target_id.to_string(),
281 });
282 };
283 let Some(dest_table) = entry.table() else {
284 let typ = entry.item().typ();
285 return Err(AdapterError::Unstructured(anyhow::anyhow!(
286 "programming error: expected a Table found {typ:?}"
287 )));
288 };
289 let collection_id = dest_table.global_id_writes();
290
291 let collection_meta = self
292 .controller
293 .storage
294 .collection_metadata(collection_id)
295 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("{e}")))?;
296 let shard_id = collection_meta.data_shard;
297 let collection_desc = collection_meta.relation_desc.clone();
298
299 let pcx = session.pcx().clone();
301 let session_meta = session.meta();
302 let catalog = self.catalog().clone();
303 let conn_catalog = catalog.for_session(session);
304 let catalog_state = conn_catalog.state();
305 let optimizer_config = optimize::OptimizerConfig::from(conn_catalog.system_vars());
306
307 let target_desc = catalog
309 .try_get_entry(&target_id)
310 .expect("table must exist")
311 .relation_desc_latest()
312 .expect("table has desc")
313 .into_owned();
314 let all_columns_in_order = columns.len() == target_desc.arity()
315 && columns.iter().enumerate().all(|(i, c)| c.to_raw() == i);
316
317 let column_transform = if all_columns_in_order {
320 None
321 } else {
322 let dummy_datums: Vec<Datum> = columns.iter().map(|_| Datum::Null).collect();
323 let dummy_row = Row::pack(&dummy_datums);
324
325 let prep = ExprPrepOneShot {
326 logical_time: EvalTime::NotAvailable,
327 session: &session_meta,
328 catalog_state,
329 };
330 let mut optimizer = optimize::view::Optimizer::new_with_prep_no_limit(
331 optimizer_config.clone(),
332 None,
333 prep,
334 );
335
336 let hir = mz_sql::plan::plan_copy_from(
337 &pcx,
338 &conn_catalog,
339 target_id,
340 target_name.clone(),
341 columns.clone(),
342 vec![dummy_row],
343 )?;
344 let mir = optimize::Optimize::optimize(&mut optimizer, hir)?;
345 let mir_expr = mir.into_inner();
346 let (result_ref, _) = mir_expr
347 .as_const()
348 .expect("optimizer should produce constant");
349 let result_rows = result_ref
350 .clone()
351 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("eval error: {e}")))?;
352
353 let (full_row, _) = result_rows.into_iter().next().expect("should have one row");
354 let full_datums: Vec<Datum> = full_row.unpack();
355
356 let col_to_source: std::collections::BTreeMap<ColumnIndex, usize> =
357 columns.iter().enumerate().map(|(a, b)| (*b, a)).collect();
358
359 let mut sources: Vec<ColumnSource> = Vec::with_capacity(target_desc.arity());
360 let mut default_datums: Vec<Datum> = Vec::new();
361
362 for i in 0..target_desc.arity() {
363 let col_idx = ColumnIndex::from_raw(i);
364 if let Some(&src_idx) = col_to_source.get(&col_idx) {
365 sources.push(ColumnSource::Input(src_idx));
366 } else {
367 sources.push(ColumnSource::Default(default_datums.len()));
368 default_datums.push(full_datums[i]);
369 }
370 }
371
372 let defaults_row = Row::pack(&default_datums);
373
374 Some(ColumnTransform {
375 sources,
376 defaults_row,
377 })
378 };
379
380 let column_types: Arc<[mz_pgrepr::Type]> = row_desc
382 .typ()
383 .column_types
384 .iter()
385 .map(|x| &x.scalar_type)
386 .map(mz_pgrepr::Type::from)
387 .collect::<Vec<_>>()
388 .into();
389
390 let num_workers = std::thread::available_parallelism()
392 .map(|n| n.get())
393 .unwrap_or(1);
394 tracing::info!(
395 %target_id, num_workers,
396 "starting parallel COPY FROM STDIN batch builders"
397 );
398
399 let column_transform = Arc::new(column_transform);
401 let target_desc = Arc::new(target_desc);
402 let collection_desc = Arc::new(collection_desc);
403 let persist_client = self.persist_client.clone();
404
405 let rt_handle = tokio::runtime::Handle::current();
410 let mut batch_txs = Vec::with_capacity(num_workers);
411 let mut worker_handles = Vec::with_capacity(num_workers);
412
413 let first_chunk_has_header = params.requires_header();
419 let mut worker_params = params;
420 if let CopyFormatParams::Csv(ref mut csv) = worker_params {
421 csv.header = false;
422 }
423
424 for worker_id in 0..num_workers {
425 let (batch_tx, batch_rx) = mpsc::channel::<Vec<u8>>(1);
428 batch_txs.push(batch_tx);
429
430 let persist_client = persist_client.clone();
431 let column_types = Arc::clone(&column_types);
432 let column_transform = Arc::clone(&column_transform);
433 let target_desc = Arc::clone(&target_desc);
434 let collection_desc = Arc::clone(&collection_desc);
435 let params = worker_params.clone();
436 let skip_header_on_first_chunk = worker_id == 0 && first_chunk_has_header;
439 let rt = rt_handle.clone();
440
441 let handle = mz_ore::task::spawn_blocking(
442 || format!("copy_from_stdin_worker:{target_id}:{worker_id}"),
443 move || {
444 rt.block_on(Self::copy_from_stdin_batch_builder(
445 persist_client,
446 shard_id,
447 collection_id,
448 collection_desc,
449 target_desc,
450 column_transform,
451 column_types,
452 params,
453 skip_header_on_first_chunk,
454 batch_rx,
455 ))
456 },
457 );
458 worker_handles.push(handle);
459 }
460
461 let (completion_tx, completion_rx) = oneshot::channel();
463 mz_ore::task::spawn(
464 || format!("copy_from_stdin_collector:{target_id}"),
465 async move {
466 let mut all_batches = Vec::with_capacity(num_workers);
467 let mut total_rows: u64 = 0;
468
469 for handle in worker_handles {
470 match handle.await {
471 Ok((proto_batches, count)) => {
472 all_batches.extend(proto_batches);
473 total_rows += count;
474 }
475 Err(e) => {
476 let _ = completion_tx.send(Err(e));
477 return;
478 }
479 }
480 }
481
482 let _ = completion_tx.send(Ok((all_batches, total_rows)));
483 },
484 );
485
486 Ok(CopyFromStdinWriter {
487 batch_txs,
488 completion_rx,
489 })
490 }
491
492 async fn copy_from_stdin_batch_builder(
495 persist_client: mz_persist_client::PersistClient,
496 shard_id: mz_persist_client::ShardId,
497 collection_id: mz_repr::GlobalId,
498 collection_desc: Arc<RelationDesc>,
499 target_desc: Arc<RelationDesc>,
500 column_transform: Arc<Option<ColumnTransform>>,
501 column_types: Arc<[mz_pgrepr::Type]>,
502 params: CopyFormatParams<'static>,
503 skip_header_on_first_chunk: bool,
504 mut batch_rx: mpsc::Receiver<Vec<u8>>,
505 ) -> Result<(Vec<ProtoBatch>, u64), AdapterError> {
506 let persist_diagnostics = Diagnostics {
507 shard_name: collection_id.to_string(),
508 handle_purpose: "CopyFromStdin::batch_builder".to_string(),
509 };
510 let write_handle = persist_client
511 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
512 shard_id,
513 collection_desc,
514 Arc::new(UnitSchema),
515 persist_diagnostics,
516 )
517 .await
518 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist open: {e}")))?;
519
520 let lower = mz_repr::Timestamp::MIN;
523 let upper = Antichain::from_elem(lower.step_forward());
524 let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
525 let mut row_count: u64 = 0;
526 let mut row_count_in_batch: u64 = 0;
527 let mut batch_bytes: usize = 0;
528 let mut proto_batches = Vec::new();
529
530 let mut is_first_chunk = true;
531 while let Some(raw_bytes) = batch_rx.recv().await {
532 let chunk_params = if is_first_chunk && skip_header_on_first_chunk {
535 let mut p = params.clone();
536 if let CopyFormatParams::Csv(ref mut csv) = p {
537 csv.header = true;
538 }
539 p
540 } else {
541 params.clone()
542 };
543 is_first_chunk = false;
544 let rows = mz_pgcopy::decode_copy_format(&raw_bytes, &column_types, chunk_params)
545 .map_err(|e| AdapterError::CopyFormatError(e.to_string()))?;
546
547 for row in rows {
548 let full_row = if let Some(ref transform) = *column_transform {
550 transform.apply(&row)
551 } else {
552 row
553 };
554
555 for (i, datum) in full_row.iter().enumerate() {
557 target_desc.constraints_met(i, &datum).map_err(|e| {
558 AdapterError::Unstructured(anyhow::anyhow!("constraint violation: {e}"))
559 })?;
560 }
561
562 let data = SourceData(Ok(full_row));
563 batch_builder
564 .add(&data, &(), &lower, &1)
565 .await
566 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist add: {e}")))?;
567 row_count += 1;
568 row_count_in_batch += 1;
569 }
570
571 batch_bytes = batch_bytes.saturating_add(raw_bytes.len());
572 if batch_bytes >= COPY_FROM_STDIN_MAX_BATCH_BYTES {
573 let batch = batch_builder.finish(upper.clone()).await.map_err(|e| {
574 AdapterError::Unstructured(anyhow::anyhow!("persist finish: {e}"))
575 })?;
576 proto_batches.push(batch.into_transmittable_batch());
577
578 batch_builder = write_handle.builder(Antichain::from_elem(lower));
579 row_count_in_batch = 0;
580 batch_bytes = 0;
581 }
582 }
583
584 if row_count_in_batch > 0 || proto_batches.is_empty() {
585 let batch = batch_builder
586 .finish(upper)
587 .await
588 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist finish: {e}")))?;
589 proto_batches.push(batch.into_transmittable_batch());
590 }
591
592 Ok((proto_batches, row_count))
593 }
594
595 pub(crate) fn commit_staged_batches(
596 &mut self,
597 conn_id: ConnectionId,
598 table_id: CatalogItemId,
599 batches: Vec<Result<ProtoBatch, String>>,
600 ) {
601 let Some(active_copy) = self.active_copies.remove(&conn_id) else {
602 tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
604 return;
605 };
606
607 let ActiveCopyFrom {
608 ingestion_id,
609 cluster_id: _,
610 table_id: _,
611 mut ctx,
612 } = active_copy;
613 tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
614
615 let mut all_batches = SmallVec::with_capacity(batches.len());
616 let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
617 let mut row_count = 0u64;
618
619 for maybe_batch in batches {
620 match maybe_batch {
621 Ok(batch) => {
622 let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
623 all_batches.push(batch);
624 row_count = row_count.saturating_add(count);
625 }
626 Err(err) => all_errors.push(err),
627 }
628 }
629
630 if let Some(error) = all_errors.pop() {
632 tracing::warn!(?error, ?all_errors, "failed COPY FROM");
633
634 ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
638 "COPY FROM: {error}"
639 ))));
640
641 return;
642 }
643
644 let stage_write = ctx
649 .session_mut()
650 .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
651 id: table_id,
652 rows: TableData::Batches(all_batches),
653 }]));
654
655 if let Err(err) = stage_write {
656 ctx.retire(Err(err));
657 } else {
658 ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
659 }
660 }
661
662 #[mz_ore::instrument(level = "debug")]
664 pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
665 if let Some(ActiveCopyFrom {
666 ingestion_id,
667 cluster_id: _,
668 table_id: _,
669 ctx,
670 }) = self.active_copies.remove(conn_id)
671 {
672 let cancel_result = self
673 .controller
674 .storage
675 .cancel_oneshot_ingestion(ingestion_id);
676 if let Err(err) = cancel_result {
677 tracing::error!(?err, "failed to cancel OneshotIngestion");
678 }
679
680 ctx.retire(Err(AdapterError::Canceled));
681 }
682 }
683}
684
685struct ColumnTransform {
688 sources: Vec<ColumnSource>,
690 defaults_row: Row,
693}
694
695enum ColumnSource {
696 Input(usize),
698 Default(usize),
700}
701
702impl ColumnTransform {
703 fn apply(&self, input: &Row) -> Row {
705 let input_datums: Vec<Datum> = input.unpack();
706 let default_datums: Vec<Datum> = self.defaults_row.unpack();
707 let mut output_datums = Vec::with_capacity(self.sources.len());
708 for source in &self.sources {
709 match source {
710 ColumnSource::Input(idx) => output_datums.push(input_datums[*idx]),
711 ColumnSource::Default(idx) => output_datums.push(default_datums[*idx]),
712 }
713 }
714 Row::pack(&output_datums)
715 }
716}