1use std::str::FromStr;
11use std::sync::Arc;
12
13use mz_adapter_types::connection::ConnectionId;
14use mz_ore::cast::CastInto;
15use mz_persist_client::Diagnostics;
16use mz_persist_client::batch::ProtoBatch;
17use mz_persist_types::codec_impls::UnitSchema;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::{CatalogItemId, ColumnIndex, Datum, RelationDesc, Row, RowArena};
20use mz_sql::catalog::SessionCatalog;
21use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
22use mz_sql::session::metadata::SessionMetadata;
23use mz_storage_client::client::TableData;
24use mz_storage_types::StorageDiff;
25use mz_storage_types::oneshot_sources::{ContentShape, OneshotIngestionRequest};
26use mz_storage_types::sources::SourceData;
27use smallvec::SmallVec;
28use timely::progress::Antichain;
29use tokio::sync::{mpsc, oneshot};
30use url::Url;
31use uuid::Uuid;
32
33use crate::command::CopyFromStdinWriter;
34use crate::coord::sequencer::inner::return_if_err;
35use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
36use crate::optimize;
37use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
38use crate::session::{Session, TransactionOps, WriteOp};
39use crate::{AdapterError, ExecuteContext, ExecuteResponse};
40
41const COPY_FROM_STDIN_MAX_BATCH_BYTES: usize = 32 * 1024 * 1024;
44
45impl Coordinator {
46 pub(crate) async fn sequence_copy_from(
47 &mut self,
48 ctx: ExecuteContext,
49 plan: plan::CopyFromPlan,
50 target_cluster: TargetCluster,
51 ) {
52 let plan::CopyFromPlan {
53 target_name: _,
54 target_id,
55 source,
56 columns: _,
57 source_desc,
58 mfp,
59 params,
60 filter,
61 } = plan;
62
63 let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
64 let style = ExprPrepOneShot {
65 logical_time: EvalTime::NotAvailable,
66 session: ctx.session(),
67 catalog_state: self.catalog().state(),
68 };
69 let mut from = from.lower_uncorrelated(self.catalog().state().system_config())?;
70 style.prep_scalar_expr(&mut from)?;
71
72 let temp_storage = RowArena::new();
75 let eval_result = from.eval(&[], &temp_storage)?;
76 let eval_string = match eval_result {
77 Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
78 Datum::String(url_str) => url_str,
79 other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
80 };
81
82 Ok(eval_string.to_string())
83 };
84
85 let Some(entry) = self.catalog().try_get_entry(&target_id) else {
87 return ctx.retire(Err(AdapterError::ConcurrentDependencyDrop {
88 dependency_kind: "table",
89 dependency_id: target_id.to_string(),
90 }));
91 };
92 let Some(dest_table) = entry.table() else {
93 let typ = entry.item().typ();
94 let msg = format!("programming error: expected a Table found {typ:?}");
95 return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
96 };
97
98 let ingestion_id = Uuid::new_v4();
100 let collection_id = dest_table.global_id_writes();
101
102 let format = match params {
103 CopyFormatParams::Csv(csv) => {
104 mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
105 }
106 CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
107 CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
108 mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
109 ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL/S3 format")));
110 return;
111 }
112 };
113
114 let source = match source {
115 CopyFromSource::Url(from_expr) => {
116 let url = return_if_err!(eval_uri(from_expr), ctx);
117 let result = Url::parse(&url)
119 .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
120 let url = return_if_err!(result, ctx);
121
122 mz_storage_types::oneshot_sources::ContentSource::Http { url }
123 }
124 CopyFromSource::AwsS3 {
125 uri,
126 connection,
127 connection_id,
128 } => {
129 let uri = return_if_err!(eval_uri(uri), ctx);
130
131 let result = http::Uri::from_str(&uri)
136 .map_err(|err| {
137 AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
138 })
139 .and_then(|uri| {
140 if uri.scheme_str() != Some("s3") {
141 coord_bail!("only 's3://...' urls are supported as COPY FROM target");
142 }
143 Ok(uri)
144 })
145 .and_then(|uri| {
146 if uri.host().is_none() {
147 coord_bail!("missing bucket name from 's3://...' url");
148 }
149 Ok(uri)
150 });
151 let uri = return_if_err!(result, ctx);
152
153 mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
154 connection,
155 connection_id,
156 uri: uri.to_string(),
157 }
158 }
159 CopyFromSource::Stdin => {
160 unreachable!("COPY FROM STDIN should be handled elsewhere")
161 }
162 };
163
164 let filter = match filter {
165 None => mz_storage_types::oneshot_sources::ContentFilter::None,
166 Some(CopyFromFilter::Files(files)) => {
167 mz_storage_types::oneshot_sources::ContentFilter::Files(files)
168 }
169 Some(CopyFromFilter::Pattern(pattern)) => {
170 mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
171 }
172 };
173
174 let source_mfp = mfp
175 .into_plan()
176 .map_err(|s| AdapterError::internal("copy_from", s))
177 .and_then(|mfp| {
178 mfp.into_nontemporal().map_err(|_| {
179 AdapterError::internal("copy_from", "temporal MFP not allowed in copy from")
180 })
181 });
182 let source_mfp = return_if_err!(source_mfp, ctx);
183
184 let shape = ContentShape {
185 source_desc,
186 source_mfp,
187 };
188
189 let request = OneshotIngestionRequest {
190 source,
191 format,
192 filter,
193 shape,
194 };
195
196 let target_cluster = match self
197 .catalog()
198 .resolve_target_cluster(target_cluster, ctx.session())
199 {
200 Ok(cluster) => cluster,
201 Err(err) => {
202 return ctx.retire(Err(err));
203 }
204 };
205 let cluster_id = target_cluster.id;
206
207 let command_tx = self.internal_cmd_tx.clone();
210 let conn_id = ctx.session().conn_id().clone();
211 let closure = Box::new(move |batches| {
212 let _ = command_tx.send(crate::coord::Message::StagedBatches {
213 conn_id,
214 table_id: target_id,
215 batches,
216 });
217 });
218 let conn_id = ctx.session().conn_id().clone();
220 self.active_copies.insert(
221 conn_id,
222 ActiveCopyFrom {
223 ingestion_id,
224 cluster_id,
225 table_id: target_id,
226 ctx,
227 },
228 );
229
230 let _result = self
231 .controller
232 .storage
233 .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
234 .await;
235 }
236
237 pub(crate) fn setup_copy_from_stdin(
244 &self,
245 session: &Session,
246 target_id: CatalogItemId,
247 target_name: String,
248 columns: Vec<ColumnIndex>,
249 row_desc: RelationDesc,
250 params: CopyFormatParams<'static>,
251 ) -> Result<CopyFromStdinWriter, AdapterError> {
252 let Some(entry) = self.catalog().try_get_entry(&target_id) else {
254 return Err(AdapterError::ConcurrentDependencyDrop {
255 dependency_kind: "table",
256 dependency_id: target_id.to_string(),
257 });
258 };
259 let Some(dest_table) = entry.table() else {
260 let typ = entry.item().typ();
261 return Err(AdapterError::Unstructured(anyhow::anyhow!(
262 "programming error: expected a Table found {typ:?}"
263 )));
264 };
265 let collection_id = dest_table.global_id_writes();
266
267 let collection_meta = self
268 .controller
269 .storage
270 .collection_metadata(collection_id)
271 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("{e}")))?;
272 let shard_id = collection_meta.data_shard;
273 let collection_desc = collection_meta.relation_desc.clone();
274
275 let pcx = session.pcx().clone();
277 let session_meta = session.meta();
278 let catalog = self.catalog().clone();
279 let conn_catalog = catalog.for_session(session);
280 let catalog_state = conn_catalog.state();
281 let optimizer_config = optimize::OptimizerConfig::from(conn_catalog.system_vars());
282
283 let target_desc = catalog
285 .try_get_entry(&target_id)
286 .expect("table must exist")
287 .relation_desc_latest()
288 .expect("table has desc")
289 .into_owned();
290 let all_columns_in_order = columns.len() == target_desc.arity()
291 && columns.iter().enumerate().all(|(i, c)| c.to_raw() == i);
292
293 let column_transform = if all_columns_in_order {
296 None
297 } else {
298 let dummy_datums: Vec<Datum> = columns.iter().map(|_| Datum::Null).collect();
299 let dummy_row = Row::pack(&dummy_datums);
300
301 let prep = ExprPrepOneShot {
302 logical_time: EvalTime::NotAvailable,
303 session: &session_meta,
304 catalog_state,
305 };
306 let mut optimizer = optimize::view::Optimizer::new_with_prep_no_limit(
307 optimizer_config.clone(),
308 None,
309 prep,
310 );
311
312 let hir = mz_sql::plan::plan_copy_from(
313 &pcx,
314 &conn_catalog,
315 target_id,
316 target_name.clone(),
317 columns.clone(),
318 vec![dummy_row],
319 )?;
320 let mir = optimize::Optimize::optimize(&mut optimizer, hir)?;
321 let mir_expr = mir.into_inner();
322 let (result_ref, _) = mir_expr
323 .as_const()
324 .expect("optimizer should produce constant");
325 let result_rows = result_ref
326 .clone()
327 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("eval error: {e}")))?;
328
329 let (full_row, _) = result_rows.into_iter().next().expect("should have one row");
330 let full_datums: Vec<Datum> = full_row.unpack();
331
332 let col_to_source: std::collections::BTreeMap<ColumnIndex, usize> =
333 columns.iter().enumerate().map(|(a, b)| (*b, a)).collect();
334
335 let mut sources: Vec<ColumnSource> = Vec::with_capacity(target_desc.arity());
336 let mut default_datums: Vec<Datum> = Vec::new();
337
338 for i in 0..target_desc.arity() {
339 let col_idx = ColumnIndex::from_raw(i);
340 if let Some(&src_idx) = col_to_source.get(&col_idx) {
341 sources.push(ColumnSource::Input(src_idx));
342 } else {
343 sources.push(ColumnSource::Default(default_datums.len()));
344 default_datums.push(full_datums[i]);
345 }
346 }
347
348 let defaults_row = Row::pack(&default_datums);
349
350 Some(ColumnTransform {
351 sources,
352 defaults_row,
353 })
354 };
355
356 let column_types: Arc<[mz_pgrepr::Type]> = row_desc
358 .typ()
359 .column_types
360 .iter()
361 .map(|x| &x.scalar_type)
362 .map(mz_pgrepr::Type::from)
363 .collect::<Vec<_>>()
364 .into();
365
366 let num_workers = std::thread::available_parallelism()
368 .map(|n| n.get())
369 .unwrap_or(1);
370 tracing::info!(
371 %target_id, num_workers,
372 "starting parallel COPY FROM STDIN batch builders"
373 );
374
375 let column_transform = Arc::new(column_transform);
377 let target_desc = Arc::new(target_desc);
378 let collection_desc = Arc::new(collection_desc);
379 let persist_client = self.persist_client.clone();
380
381 let rt_handle = tokio::runtime::Handle::current();
386 let mut batch_txs = Vec::with_capacity(num_workers);
387 let mut worker_handles = Vec::with_capacity(num_workers);
388
389 let first_chunk_has_header = params.requires_header();
395 let mut worker_params = params;
396 if let CopyFormatParams::Csv(ref mut csv) = worker_params {
397 csv.header = false;
398 }
399
400 for worker_id in 0..num_workers {
401 let (batch_tx, batch_rx) = mpsc::channel::<Vec<u8>>(1);
404 batch_txs.push(batch_tx);
405
406 let persist_client = persist_client.clone();
407 let column_types = Arc::clone(&column_types);
408 let column_transform = Arc::clone(&column_transform);
409 let target_desc = Arc::clone(&target_desc);
410 let collection_desc = Arc::clone(&collection_desc);
411 let params = worker_params.clone();
412 let skip_header_on_first_chunk = worker_id == 0 && first_chunk_has_header;
415 let rt = rt_handle.clone();
416
417 let handle = mz_ore::task::spawn_blocking(
418 || format!("copy_from_stdin_worker:{target_id}:{worker_id}"),
419 move || {
420 rt.block_on(Self::copy_from_stdin_batch_builder(
421 persist_client,
422 shard_id,
423 collection_id,
424 collection_desc,
425 target_desc,
426 column_transform,
427 column_types,
428 params,
429 skip_header_on_first_chunk,
430 batch_rx,
431 ))
432 },
433 );
434 worker_handles.push(handle);
435 }
436
437 let (completion_tx, completion_rx) = oneshot::channel();
439 mz_ore::task::spawn(
440 || format!("copy_from_stdin_collector:{target_id}"),
441 async move {
442 let mut all_batches = Vec::with_capacity(num_workers);
443 let mut total_rows: u64 = 0;
444
445 for handle in worker_handles {
446 match handle.await {
447 Ok((proto_batches, count)) => {
448 all_batches.extend(proto_batches);
449 total_rows += count;
450 }
451 Err(e) => {
452 let _ = completion_tx.send(Err(e));
453 return;
454 }
455 }
456 }
457
458 let _ = completion_tx.send(Ok((all_batches, total_rows)));
459 },
460 );
461
462 Ok(CopyFromStdinWriter {
463 batch_txs,
464 completion_rx,
465 })
466 }
467
468 async fn copy_from_stdin_batch_builder(
471 persist_client: mz_persist_client::PersistClient,
472 shard_id: mz_persist_client::ShardId,
473 collection_id: mz_repr::GlobalId,
474 collection_desc: Arc<RelationDesc>,
475 target_desc: Arc<RelationDesc>,
476 column_transform: Arc<Option<ColumnTransform>>,
477 column_types: Arc<[mz_pgrepr::Type]>,
478 params: CopyFormatParams<'static>,
479 skip_header_on_first_chunk: bool,
480 mut batch_rx: mpsc::Receiver<Vec<u8>>,
481 ) -> Result<(Vec<ProtoBatch>, u64), AdapterError> {
482 let persist_diagnostics = Diagnostics {
483 shard_name: collection_id.to_string(),
484 handle_purpose: "CopyFromStdin::batch_builder".to_string(),
485 };
486 let write_handle = persist_client
487 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
488 shard_id,
489 collection_desc,
490 Arc::new(UnitSchema),
491 persist_diagnostics,
492 )
493 .await
494 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist open: {e}")))?;
495
496 let lower = mz_repr::Timestamp::MIN;
499 let upper = Antichain::from_elem(lower.step_forward());
500 let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
501 let mut row_count: u64 = 0;
502 let mut row_count_in_batch: u64 = 0;
503 let mut batch_bytes: usize = 0;
504 let mut proto_batches = Vec::new();
505
506 let mut is_first_chunk = true;
507 while let Some(raw_bytes) = batch_rx.recv().await {
508 let chunk_params = if is_first_chunk && skip_header_on_first_chunk {
511 let mut p = params.clone();
512 if let CopyFormatParams::Csv(ref mut csv) = p {
513 csv.header = true;
514 }
515 p
516 } else {
517 params.clone()
518 };
519 is_first_chunk = false;
520 let rows = mz_pgcopy::decode_copy_format(&raw_bytes, &column_types, chunk_params)
521 .map_err(|e| AdapterError::CopyFormatError(e.to_string()))?;
522
523 for row in rows {
524 let full_row = if let Some(ref transform) = *column_transform {
526 transform.apply(&row)
527 } else {
528 row
529 };
530
531 for (i, datum) in full_row.iter().enumerate() {
533 target_desc.constraints_met(i, &datum).map_err(|e| {
534 AdapterError::Unstructured(anyhow::anyhow!("constraint violation: {e}"))
535 })?;
536 }
537
538 let data = SourceData(Ok(full_row));
539 batch_builder
540 .add(&data, &(), &lower, &1)
541 .await
542 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist add: {e}")))?;
543 row_count += 1;
544 row_count_in_batch += 1;
545 }
546
547 batch_bytes = batch_bytes.saturating_add(raw_bytes.len());
548 if batch_bytes >= COPY_FROM_STDIN_MAX_BATCH_BYTES {
549 let batch = batch_builder.finish(upper.clone()).await.map_err(|e| {
550 AdapterError::Unstructured(anyhow::anyhow!("persist finish: {e}"))
551 })?;
552 proto_batches.push(batch.into_transmittable_batch());
553
554 batch_builder = write_handle.builder(Antichain::from_elem(lower));
555 row_count_in_batch = 0;
556 batch_bytes = 0;
557 }
558 }
559
560 if row_count_in_batch > 0 || proto_batches.is_empty() {
561 let batch = batch_builder
562 .finish(upper)
563 .await
564 .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist finish: {e}")))?;
565 proto_batches.push(batch.into_transmittable_batch());
566 }
567
568 Ok((proto_batches, row_count))
569 }
570
571 pub(crate) fn commit_staged_batches(
572 &mut self,
573 conn_id: ConnectionId,
574 table_id: CatalogItemId,
575 batches: Vec<Result<ProtoBatch, String>>,
576 ) {
577 let Some(active_copy) = self.active_copies.remove(&conn_id) else {
578 tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
580 return;
581 };
582
583 let ActiveCopyFrom {
584 ingestion_id,
585 cluster_id: _,
586 table_id: _,
587 mut ctx,
588 } = active_copy;
589 tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
590
591 let mut all_batches = SmallVec::with_capacity(batches.len());
592 let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
593 let mut row_count = 0u64;
594
595 for maybe_batch in batches {
596 match maybe_batch {
597 Ok(batch) => {
598 let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
599 all_batches.push(batch);
600 row_count = row_count.saturating_add(count);
601 }
602 Err(err) => all_errors.push(err),
603 }
604 }
605
606 if let Some(error) = all_errors.pop() {
608 tracing::warn!(?error, ?all_errors, "failed COPY FROM");
609
610 ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
614 "COPY FROM: {error}"
615 ))));
616
617 return;
618 }
619
620 let stage_write = ctx
625 .session_mut()
626 .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
627 id: table_id,
628 rows: TableData::Batches(all_batches),
629 }]));
630
631 if let Err(err) = stage_write {
632 ctx.retire(Err(err));
633 } else {
634 ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
635 }
636 }
637
638 #[mz_ore::instrument(level = "debug")]
640 pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
641 if let Some(ActiveCopyFrom {
642 ingestion_id,
643 cluster_id: _,
644 table_id: _,
645 ctx,
646 }) = self.active_copies.remove(conn_id)
647 {
648 let cancel_result = self
649 .controller
650 .storage
651 .cancel_oneshot_ingestion(ingestion_id);
652 if let Err(err) = cancel_result {
653 tracing::error!(?err, "failed to cancel OneshotIngestion");
654 }
655
656 ctx.retire(Err(AdapterError::Canceled));
657 }
658 }
659}
660
661struct ColumnTransform {
664 sources: Vec<ColumnSource>,
666 defaults_row: Row,
669}
670
671enum ColumnSource {
672 Input(usize),
674 Default(usize),
676}
677
678impl ColumnTransform {
679 fn apply(&self, input: &Row) -> Row {
681 let input_datums: Vec<Datum> = input.unpack();
682 let default_datums: Vec<Datum> = self.defaults_row.unpack();
683 let mut output_datums = Vec::with_capacity(self.sources.len());
684 for source in &self.sources {
685 match source {
686 ColumnSource::Input(idx) => output_datums.push(input_datums[*idx]),
687 ColumnSource::Default(idx) => output_datums.push(default_datums[*idx]),
688 }
689 }
690 Row::pack(&output_datums)
691 }
692}