1use std::str::FromStr;
11use std::sync::Arc;
12
13use mz_adapter_types::connection::ConnectionId;
14use mz_expr::Eval;
15use mz_ore::cast::CastInto;
16use mz_persist_client::Diagnostics;
17use mz_persist_client::batch::ProtoBatch;
18use mz_persist_types::codec_impls::UnitSchema;
19use mz_pgcopy::CopyFormatParams;
20use mz_repr::{CatalogItemId, ColumnIndex, Datum, RelationDesc, Row, RowArena};
21use mz_sql::catalog::SessionCatalog;
22use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
23use mz_sql::session::metadata::SessionMetadata;
24use mz_storage_client::client::TableData;
25use mz_storage_types::StorageDiff;
26use mz_storage_types::oneshot_sources::{ContentShape, OneshotIngestionRequest};
27use mz_storage_types::sources::SourceData;
28use smallvec::SmallVec;
29use timely::progress::Antichain;
30use tokio::sync::{mpsc, oneshot};
31use url::Url;
32use uuid::Uuid;
33
34use crate::command::CopyFromStdinWriter;
35use crate::coord::sequencer::inner::return_if_err;
36use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
37use crate::optimize;
38use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
39use crate::session::{Session, TransactionOps, WriteOp};
40use crate::{AdapterError, ExecuteContext, ExecuteResponse};
41
42const COPY_FROM_STDIN_MAX_BATCH_BYTES: usize = 32 * 1024 * 1024;
45
46impl Coordinator {
47 pub(crate) async fn sequence_copy_from(
48 &mut self,
49 ctx: ExecuteContext,
50 plan: plan::CopyFromPlan,
51 target_cluster: TargetCluster,
52 ) {
53 let plan::CopyFromPlan {
54 target_name: _,
55 target_id,
56 source,
57 columns: _,
58 source_desc,
59 mfp,
60 params,
61 filter,
62 } = plan;
63
64 let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
65 let style = ExprPrepOneShot {
66 logical_time: EvalTime::NotAvailable,
67 session: ctx.session(),
68 catalog_state: self.catalog().state(),
69 };
70 let mut from = from.lower_uncorrelated(self.catalog().state().system_config())?;
71 style.prep_scalar_expr(&mut from)?;
72
73 let temp_storage = RowArena::new();
76 let eval_result = from.eval(&[], &temp_storage)?;
77 let eval_string = match eval_result {
78 Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
79 Datum::String(url_str) => url_str,
80 other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
81 };
82
83 Ok(eval_string.to_string())
84 };
85
86 let Some(entry) = self.catalog().try_get_entry(&target_id) else {
88 return ctx.retire(Err(AdapterError::ConcurrentDependencyDrop {
89 dependency_kind: "table",
90 dependency_id: target_id.to_string(),
91 }));
92 };
93 let Some(dest_table) = entry.table() else {
94 let typ = entry.item().typ();
95 let msg = format!("programming error: expected a Table found {typ:?}");
96 return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
97 };
98
99 let ingestion_id = Uuid::new_v4();
101 let collection_id = dest_table.global_id_writes();
102
103 let format = match params {
104 CopyFormatParams::Csv(csv) => {
105 mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
106 }
107 CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
108 CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
109 mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
110 ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL/S3 format")));
111 return;
112 }
113 };
114
115 let source = match source {
116 CopyFromSource::Url(from_expr) => {
117 let url = return_if_err!(eval_uri(from_expr), ctx);
118 let result = Url::parse(&url)
120 .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
121 let url = return_if_err!(result, ctx);
122
123 match url.scheme() {
130 "http" | "https" => {}
131 other => {
132 return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
133 "only 'http://' and 'https://' urls are supported as COPY FROM \
134 target, got '{other}://'"
135 ))));
136 }
137 }
138 let enforce_external_addresses =
139 mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
140 .get(self.controller.storage.config().config_set());
141 if enforce_external_addresses {
142 if let Err(err) = mz_ore::netio::ensure_url_ip_global(&url) {
143 return ctx
144 .retire(Err(AdapterError::Unstructured(anyhow::anyhow!("{err}"))));
145 }
146 }
147 mz_storage_types::oneshot_sources::ContentSource::Http { url }
148 }
149 CopyFromSource::AwsS3 {
150 uri,
151 connection,
152 connection_id,
153 } => {
154 let uri = return_if_err!(eval_uri(uri), ctx);
155
156 let result = http::Uri::from_str(&uri)
161 .map_err(|err| {
162 AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
163 })
164 .and_then(|uri| {
165 if uri.scheme_str() != Some("s3") && uri.scheme_str() != Some("gs") {
166 coord_bail!("only 's3://...' and 'gs://...' urls are supported as COPY FROM target");
167 }
168 Ok(uri)
169 })
170 .and_then(|uri| {
171 if uri.host().is_none() {
172 coord_bail!("missing bucket name from 's3://...' url");
173 }
174 Ok(uri)
175 });
176 let uri = return_if_err!(result, ctx);
177
178 mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
179 connection,
180 connection_id,
181 uri: uri.to_string(),
182 }
183 }
184 CopyFromSource::Stdin => {
185 unreachable!("COPY FROM STDIN should be handled elsewhere")
186 }
187 };
188
189 let filter = match filter {
190 None => mz_storage_types::oneshot_sources::ContentFilter::None,
191 Some(CopyFromFilter::Files(files)) => {
192 mz_storage_types::oneshot_sources::ContentFilter::Files(files)
193 }
194 Some(CopyFromFilter::Pattern(pattern)) => {
195 mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
196 }
197 };
198
199 let source_mfp = mfp
200 .into_plan()
201 .map_err(|s| AdapterError::internal("copy_from", s))
202 .and_then(|mfp| {
203 mfp.into_nontemporal().map_err(|_| {
204 AdapterError::internal("copy_from", "temporal MFP not allowed in copy from")
205 })
206 });
207 let source_mfp = return_if_err!(source_mfp, ctx);
208
209 let shape = ContentShape {
210 source_desc,
211 source_mfp,
212 };
213
214 let request = OneshotIngestionRequest {
215 source,
216 format,
217 filter,
218 shape,
219 };
220
221 let target_cluster = match self
222 .catalog()
223 .resolve_target_cluster(target_cluster, ctx.session())
224 {
225 Ok(cluster) => cluster,
226 Err(err) => {
227 return ctx.retire(Err(err));
228 }
229 };
230 let cluster_id = target_cluster.id;
231
232 let command_tx = self.internal_cmd_tx.clone();
235 let conn_id = ctx.session().conn_id().clone();
236 let closure = Box::new(move |batches| {
237 let _ = command_tx.send(crate::coord::Message::StagedBatches {
238 conn_id,
239 table_id: target_id,
240 batches,
241 });
242 });
243 let conn_id = ctx.session().conn_id().clone();
245 self.active_copies.insert(
246 conn_id,
247 ActiveCopyFrom {
248 ingestion_id,
249 cluster_id,
250 table_id: target_id,
251 ctx,
252 },
253 );
254
255 let _result = self
256 .controller
257 .storage
258 .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
259 .await;
260 }
261
262 pub(crate) fn setup_copy_from_stdin(
269 &self,
270 session: &Session,
271 target_id: CatalogItemId,
272 target_name: String,
273 columns: Vec<ColumnIndex>,
274 row_desc: RelationDesc,
275 params: CopyFormatParams<'static>,
276 ) -> Result<CopyFromStdinWriter, AdapterError> {
277 let Some(entry) = self.catalog().try_get_entry(&target_id) else {
279 return Err(AdapterError::ConcurrentDependencyDrop {
280 dependency_kind: "table",
281 dependency_id: target_id.to_string(),
282 });
283 };
284 let Some(dest_table) = entry.table() else {
285 let typ = entry.item().typ();
286 return Err(AdapterError::Unstructured(anyhow::anyhow!(
287 "programming error: expected a Table found {typ:?}"
288 )));
289 };
290 let collection_id = dest_table.global_id_writes();
291
292 let collection_meta = self
293 .controller
294 .storage
295 .collection_metadata(collection_id)
296 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("{e}")))?;
297 let shard_id = collection_meta.data_shard;
298 let collection_desc = collection_meta.relation_desc.clone();
299
300 let pcx = session.pcx().clone();
302 let session_meta = session.meta();
303 let catalog = self.catalog().clone();
304 let conn_catalog = catalog.for_session(session);
305 let catalog_state = conn_catalog.state();
306 let optimizer_config = optimize::OptimizerConfig::from(conn_catalog.system_vars());
307
308 let target_desc = catalog
310 .try_get_entry(&target_id)
311 .expect("table must exist")
312 .relation_desc_latest()
313 .expect("table has desc")
314 .into_owned();
315 let all_columns_in_order = columns.len() == target_desc.arity()
316 && columns.iter().enumerate().all(|(i, c)| c.to_raw() == i);
317
318 let column_transform = if all_columns_in_order {
321 None
322 } else {
323 let dummy_datums: Vec<Datum> = columns.iter().map(|_| Datum::Null).collect();
324 let dummy_row = Row::pack(&dummy_datums);
325
326 let prep = ExprPrepOneShot {
327 logical_time: EvalTime::NotAvailable,
328 session: &session_meta,
329 catalog_state,
330 };
331 let mut optimizer = optimize::view::Optimizer::new_with_prep_no_limit(
332 optimizer_config.clone(),
333 None,
334 prep,
335 );
336
337 let hir = mz_sql::plan::plan_copy_from(
338 &pcx,
339 &conn_catalog,
340 target_id,
341 target_name.clone(),
342 columns.clone(),
343 vec![dummy_row],
344 )?;
345 let mir = optimize::Optimize::optimize(&mut optimizer, hir)?;
346 let mir_expr = mir.into_inner();
347 let (result_ref, _) = mir_expr
348 .as_const()
349 .expect("optimizer should produce constant");
350 let result_rows = result_ref
351 .clone()
352 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("eval error: {e}")))?;
353
354 let (full_row, _) = result_rows.into_iter().next().expect("should have one row");
355 let full_datums: Vec<Datum> = full_row.unpack();
356
357 let col_to_source: std::collections::BTreeMap<ColumnIndex, usize> =
358 columns.iter().enumerate().map(|(a, b)| (*b, a)).collect();
359
360 let mut sources: Vec<ColumnSource> = Vec::with_capacity(target_desc.arity());
361 let mut default_datums: Vec<Datum> = Vec::new();
362
363 for i in 0..target_desc.arity() {
364 let col_idx = ColumnIndex::from_raw(i);
365 if let Some(&src_idx) = col_to_source.get(&col_idx) {
366 sources.push(ColumnSource::Input(src_idx));
367 } else {
368 sources.push(ColumnSource::Default(default_datums.len()));
369 default_datums.push(full_datums[i]);
370 }
371 }
372
373 let defaults_row = Row::pack(&default_datums);
374
375 Some(ColumnTransform {
376 sources,
377 defaults_row,
378 })
379 };
380
381 let column_types: Arc<[mz_pgrepr::Type]> = row_desc
383 .typ()
384 .column_types
385 .iter()
386 .map(|x| &x.scalar_type)
387 .map(mz_pgrepr::Type::from)
388 .collect::<Vec<_>>()
389 .into();
390
391 let num_workers = std::thread::available_parallelism()
393 .map(|n| n.get())
394 .unwrap_or(1);
395 tracing::info!(
396 %target_id, num_workers,
397 "starting parallel COPY FROM STDIN batch builders"
398 );
399
400 let column_transform = Arc::new(column_transform);
402 let target_desc = Arc::new(target_desc);
403 let collection_desc = Arc::new(collection_desc);
404 let persist_client = self.persist_client.clone();
405
406 let rt_handle = tokio::runtime::Handle::current();
411 let mut batch_txs = Vec::with_capacity(num_workers);
412 let mut worker_handles = Vec::with_capacity(num_workers);
413
414 let first_chunk_has_header = params.requires_header();
420 let mut worker_params = params;
421 if let CopyFormatParams::Csv(ref mut csv) = worker_params {
422 csv.header = false;
423 }
424
425 for worker_id in 0..num_workers {
426 let (batch_tx, batch_rx) = mpsc::channel::<Vec<u8>>(1);
429 batch_txs.push(batch_tx);
430
431 let persist_client = persist_client.clone();
432 let column_types = Arc::clone(&column_types);
433 let column_transform = Arc::clone(&column_transform);
434 let target_desc = Arc::clone(&target_desc);
435 let collection_desc = Arc::clone(&collection_desc);
436 let params = worker_params.clone();
437 let skip_header_on_first_chunk = worker_id == 0 && first_chunk_has_header;
440 let rt = rt_handle.clone();
441
442 let handle = mz_ore::task::spawn_blocking(
443 || format!("copy_from_stdin_worker:{target_id}:{worker_id}"),
444 move || {
445 rt.block_on(Self::copy_from_stdin_batch_builder(
446 persist_client,
447 shard_id,
448 collection_id,
449 collection_desc,
450 target_desc,
451 column_transform,
452 column_types,
453 params,
454 skip_header_on_first_chunk,
455 batch_rx,
456 ))
457 },
458 );
459 worker_handles.push(handle);
460 }
461
462 let (completion_tx, completion_rx) = oneshot::channel();
464 mz_ore::task::spawn(
465 || format!("copy_from_stdin_collector:{target_id}"),
466 async move {
467 let mut all_batches = Vec::with_capacity(num_workers);
468 let mut total_rows: u64 = 0;
469
470 for handle in worker_handles {
471 match handle.await {
472 Ok((proto_batches, count)) => {
473 all_batches.extend(proto_batches);
474 total_rows += count;
475 }
476 Err(e) => {
477 let _ = completion_tx.send(Err(e));
478 return;
479 }
480 }
481 }
482
483 let _ = completion_tx.send(Ok((all_batches, total_rows)));
484 },
485 );
486
487 Ok(CopyFromStdinWriter {
488 batch_txs,
489 completion_rx,
490 })
491 }
492
493 async fn copy_from_stdin_batch_builder(
496 persist_client: mz_persist_client::PersistClient,
497 shard_id: mz_persist_client::ShardId,
498 collection_id: mz_repr::GlobalId,
499 collection_desc: Arc<RelationDesc>,
500 target_desc: Arc<RelationDesc>,
501 column_transform: Arc<Option<ColumnTransform>>,
502 column_types: Arc<[mz_pgrepr::Type]>,
503 params: CopyFormatParams<'static>,
504 skip_header_on_first_chunk: bool,
505 mut batch_rx: mpsc::Receiver<Vec<u8>>,
506 ) -> Result<(Vec<ProtoBatch>, u64), AdapterError> {
507 let persist_diagnostics = Diagnostics {
508 shard_name: collection_id.to_string(),
509 handle_purpose: "CopyFromStdin::batch_builder".to_string(),
510 };
511 let write_handle = persist_client
512 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
513 shard_id,
514 collection_desc,
515 Arc::new(UnitSchema),
516 persist_diagnostics,
517 )
518 .await
519 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist open: {e}")))?;
520
521 let lower = mz_repr::Timestamp::MIN;
524 let upper = Antichain::from_elem(lower.step_forward());
525 let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
526 let mut row_count: u64 = 0;
527 let mut row_count_in_batch: u64 = 0;
528 let mut batch_bytes: usize = 0;
529 let mut proto_batches = Vec::new();
530
531 let mut is_first_chunk = true;
532 while let Some(raw_bytes) = batch_rx.recv().await {
533 let chunk_params = if is_first_chunk && skip_header_on_first_chunk {
536 let mut p = params.clone();
537 if let CopyFormatParams::Csv(ref mut csv) = p {
538 csv.header = true;
539 }
540 p
541 } else {
542 params.clone()
543 };
544 is_first_chunk = false;
545 let rows = mz_pgcopy::decode_copy_format(&raw_bytes, &column_types, chunk_params)
546 .map_err(|e| AdapterError::CopyFormatError(e.to_string()))?;
547
548 for row in rows {
549 let full_row = if let Some(ref transform) = *column_transform {
551 transform.apply(&row)
552 } else {
553 row
554 };
555
556 for (i, datum) in full_row.iter().enumerate() {
558 target_desc.constraints_met(i, &datum).map_err(|e| {
559 AdapterError::Unstructured(anyhow::anyhow!("constraint violation: {e}"))
560 })?;
561 }
562
563 let data = SourceData(Ok(full_row));
564 batch_builder
565 .add(&data, &(), &lower, &1)
566 .await
567 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist add: {e}")))?;
568 row_count += 1;
569 row_count_in_batch += 1;
570 }
571
572 batch_bytes = batch_bytes.saturating_add(raw_bytes.len());
573 if batch_bytes >= COPY_FROM_STDIN_MAX_BATCH_BYTES {
574 let batch = batch_builder.finish(upper.clone()).await.map_err(|e| {
575 AdapterError::Unstructured(anyhow::anyhow!("persist finish: {e}"))
576 })?;
577 proto_batches.push(batch.into_transmittable_batch());
578
579 batch_builder = write_handle.builder(Antichain::from_elem(lower));
580 row_count_in_batch = 0;
581 batch_bytes = 0;
582 }
583 }
584
585 if row_count_in_batch > 0 || proto_batches.is_empty() {
586 let batch = batch_builder
587 .finish(upper)
588 .await
589 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist finish: {e}")))?;
590 proto_batches.push(batch.into_transmittable_batch());
591 }
592
593 Ok((proto_batches, row_count))
594 }
595
596 pub(crate) fn commit_staged_batches(
597 &mut self,
598 conn_id: ConnectionId,
599 table_id: CatalogItemId,
600 batches: Vec<Result<ProtoBatch, String>>,
601 ) {
602 let Some(active_copy) = self.active_copies.remove(&conn_id) else {
603 tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
605 return;
606 };
607
608 let ActiveCopyFrom {
609 ingestion_id,
610 cluster_id: _,
611 table_id: _,
612 mut ctx,
613 } = active_copy;
614 tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
615
616 let mut all_batches = SmallVec::with_capacity(batches.len());
617 let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
618 let mut row_count = 0u64;
619
620 for maybe_batch in batches {
621 match maybe_batch {
622 Ok(batch) => {
623 let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
624 all_batches.push(batch);
625 row_count = row_count.saturating_add(count);
626 }
627 Err(err) => all_errors.push(err),
628 }
629 }
630
631 if let Some(error) = all_errors.pop() {
633 tracing::warn!(?error, ?all_errors, "failed COPY FROM");
634
635 ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
639 "COPY FROM: {error}"
640 ))));
641
642 return;
643 }
644
645 let stage_write = ctx
650 .session_mut()
651 .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
652 id: table_id,
653 rows: TableData::Batches(all_batches),
654 }]));
655
656 if let Err(err) = stage_write {
657 ctx.retire(Err(err));
658 } else {
659 ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
660 }
661 }
662
663 #[mz_ore::instrument(level = "debug")]
665 pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
666 if let Some(ActiveCopyFrom {
667 ingestion_id,
668 cluster_id: _,
669 table_id: _,
670 ctx,
671 }) = self.active_copies.remove(conn_id)
672 {
673 let cancel_result = self
674 .controller
675 .storage
676 .cancel_oneshot_ingestion(ingestion_id);
677 if let Err(err) = cancel_result {
678 tracing::error!(?err, "failed to cancel OneshotIngestion");
679 }
680
681 ctx.retire(Err(AdapterError::Canceled));
682 }
683 }
684}
685
686struct ColumnTransform {
689 sources: Vec<ColumnSource>,
691 defaults_row: Row,
694}
695
696enum ColumnSource {
697 Input(usize),
699 Default(usize),
701}
702
703impl ColumnTransform {
704 fn apply(&self, input: &Row) -> Row {
706 let input_datums: Vec<Datum> = input.unpack();
707 let default_datums: Vec<Datum> = self.defaults_row.unpack();
708 let mut output_datums = Vec::with_capacity(self.sources.len());
709 for source in &self.sources {
710 match source {
711 ColumnSource::Input(idx) => output_datums.push(input_datums[*idx]),
712 ColumnSource::Default(idx) => output_datums.push(default_datums[*idx]),
713 }
714 }
715 Row::pack(&output_datums)
716 }
717}