Skip to main content

mz_adapter/coord/sequencer/inner/
copy_from.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::str::FromStr;
11use std::sync::Arc;
12
13use mz_adapter_types::connection::ConnectionId;
14use mz_ore::cast::CastInto;
15use mz_persist_client::Diagnostics;
16use mz_persist_client::batch::ProtoBatch;
17use mz_persist_types::codec_impls::UnitSchema;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::{CatalogItemId, ColumnIndex, Datum, RelationDesc, Row, RowArena};
20use mz_sql::catalog::SessionCatalog;
21use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
22use mz_sql::session::metadata::SessionMetadata;
23use mz_storage_client::client::TableData;
24use mz_storage_types::StorageDiff;
25use mz_storage_types::oneshot_sources::{ContentShape, OneshotIngestionRequest};
26use mz_storage_types::sources::SourceData;
27use smallvec::SmallVec;
28use timely::progress::Antichain;
29use tokio::sync::{mpsc, oneshot};
30use url::Url;
31use uuid::Uuid;
32
33use crate::command::CopyFromStdinWriter;
34use crate::coord::sequencer::inner::return_if_err;
35use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
36use crate::optimize;
37use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
38use crate::session::{Session, TransactionOps, WriteOp};
39use crate::{AdapterError, ExecuteContext, ExecuteResponse};
40
41/// Finalize persist batches periodically during COPY FROM STDIN to avoid
42/// unbounded in-memory growth in a single giant batch.
43const COPY_FROM_STDIN_MAX_BATCH_BYTES: usize = 32 * 1024 * 1024;
44
45impl Coordinator {
46    pub(crate) async fn sequence_copy_from(
47        &mut self,
48        ctx: ExecuteContext,
49        plan: plan::CopyFromPlan,
50        target_cluster: TargetCluster,
51    ) {
52        let plan::CopyFromPlan {
53            target_name: _,
54            target_id,
55            source,
56            columns: _,
57            source_desc,
58            mfp,
59            params,
60            filter,
61        } = plan;
62
63        let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
64            let style = ExprPrepOneShot {
65                logical_time: EvalTime::NotAvailable,
66                session: ctx.session(),
67                catalog_state: self.catalog().state(),
68            };
69            let mut from = from.lower_uncorrelated(self.catalog().state().system_config())?;
70            style.prep_scalar_expr(&mut from)?;
71
72            // TODO(cf3): Add structured errors for the below uses of `coord_bail!`
73            // and AdapterError::Unstructured.
74            let temp_storage = RowArena::new();
75            let eval_result = from.eval(&[], &temp_storage)?;
76            let eval_string = match eval_result {
77                Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
78                Datum::String(url_str) => url_str,
79                other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
80            };
81
82            Ok(eval_string.to_string())
83        };
84
85        // We check in planning that we're copying into a Table, but be defensive.
86        let Some(entry) = self.catalog().try_get_entry(&target_id) else {
87            return ctx.retire(Err(AdapterError::ConcurrentDependencyDrop {
88                dependency_kind: "table",
89                dependency_id: target_id.to_string(),
90            }));
91        };
92        let Some(dest_table) = entry.table() else {
93            let typ = entry.item().typ();
94            let msg = format!("programming error: expected a Table found {typ:?}");
95            return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
96        };
97
98        // Generate a unique UUID for our ingestion.
99        let ingestion_id = Uuid::new_v4();
100        let collection_id = dest_table.global_id_writes();
101
102        let format = match params {
103            CopyFormatParams::Csv(csv) => {
104                mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
105            }
106            CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
107            CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
108                mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
109                ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL/S3 format")));
110                return;
111            }
112        };
113
114        let source = match source {
115            CopyFromSource::Url(from_expr) => {
116                let url = return_if_err!(eval_uri(from_expr), ctx);
117                // TODO(cf2): Structured errors.
118                let result = Url::parse(&url)
119                    .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
120                let url = return_if_err!(result, ctx);
121
122                // Only allow http(s) schemes. Technically we would fail later, as the current
123                // crate (reqwest) doesn't support other schemes.
124                // Prefer to fail early and explicitly in case the downstream ever changes.
125                // DNS resolution for hostnames is performed at execution time to avoid stalls
126                // during sequencing; IP-literal hosts are validated here because reqwest's
127                // custom DNS resolver is only invoked for hostnames.
128                match url.scheme() {
129                    "http" | "https" => {}
130                    other => {
131                        return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
132                            "only 'http://' and 'https://' urls are supported as COPY FROM \
133                             target, got '{other}://'"
134                        ))));
135                    }
136                }
137                let enforce_external_addresses =
138                    mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
139                        .get(self.controller.storage.config().config_set());
140                if enforce_external_addresses {
141                    if let Err(err) = mz_ore::netio::ensure_url_ip_global(&url) {
142                        return ctx
143                            .retire(Err(AdapterError::Unstructured(anyhow::anyhow!("{err}"))));
144                    }
145                }
146                mz_storage_types::oneshot_sources::ContentSource::Http { url }
147            }
148            CopyFromSource::AwsS3 {
149                uri,
150                connection,
151                connection_id,
152            } => {
153                let uri = return_if_err!(eval_uri(uri), ctx);
154
155                // Validate the URI is an S3 URI, with a bucket name. We rely on validating here
156                // and expect it in clusterd.
157                //
158                // TODO(cf2): Structured errors.
159                let result = http::Uri::from_str(&uri)
160                    .map_err(|err| {
161                        AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
162                    })
163                    .and_then(|uri| {
164                        if uri.scheme_str() != Some("s3") && uri.scheme_str() != Some("gs") {
165                            coord_bail!("only 's3://...' and 'gs://...' urls are supported as COPY FROM target");
166                        }
167                        Ok(uri)
168                    })
169                    .and_then(|uri| {
170                        if uri.host().is_none() {
171                            coord_bail!("missing bucket name from 's3://...' url");
172                        }
173                        Ok(uri)
174                    });
175                let uri = return_if_err!(result, ctx);
176
177                mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
178                    connection,
179                    connection_id,
180                    uri: uri.to_string(),
181                }
182            }
183            CopyFromSource::Stdin => {
184                unreachable!("COPY FROM STDIN should be handled elsewhere")
185            }
186        };
187
188        let filter = match filter {
189            None => mz_storage_types::oneshot_sources::ContentFilter::None,
190            Some(CopyFromFilter::Files(files)) => {
191                mz_storage_types::oneshot_sources::ContentFilter::Files(files)
192            }
193            Some(CopyFromFilter::Pattern(pattern)) => {
194                mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
195            }
196        };
197
198        let source_mfp = mfp
199            .into_plan()
200            .map_err(|s| AdapterError::internal("copy_from", s))
201            .and_then(|mfp| {
202                mfp.into_nontemporal().map_err(|_| {
203                    AdapterError::internal("copy_from", "temporal MFP not allowed in copy from")
204                })
205            });
206        let source_mfp = return_if_err!(source_mfp, ctx);
207
208        let shape = ContentShape {
209            source_desc,
210            source_mfp,
211        };
212
213        let request = OneshotIngestionRequest {
214            source,
215            format,
216            filter,
217            shape,
218        };
219
220        let target_cluster = match self
221            .catalog()
222            .resolve_target_cluster(target_cluster, ctx.session())
223        {
224            Ok(cluster) => cluster,
225            Err(err) => {
226                return ctx.retire(Err(err));
227            }
228        };
229        let cluster_id = target_cluster.id;
230
231        // When we finish staging the Batches in Persist, we'll send a command
232        // to the Coordinator.
233        let command_tx = self.internal_cmd_tx.clone();
234        let conn_id = ctx.session().conn_id().clone();
235        let closure = Box::new(move |batches| {
236            let _ = command_tx.send(crate::coord::Message::StagedBatches {
237                conn_id,
238                table_id: target_id,
239                batches,
240            });
241        });
242        // Stash the execute context so we can cancel the COPY.
243        let conn_id = ctx.session().conn_id().clone();
244        self.active_copies.insert(
245            conn_id,
246            ActiveCopyFrom {
247                ingestion_id,
248                cluster_id,
249                table_id: target_id,
250                ctx,
251            },
252        );
253
254        let _result = self
255            .controller
256            .storage
257            .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
258            .await;
259    }
260
261    /// Sets up a streaming COPY FROM STDIN operation.
262    ///
263    /// Spawns N parallel background batch builder tasks that each receive
264    /// raw byte chunks, decode them, apply column defaults/reordering,
265    /// and build persist batches. Returns a [`CopyFromStdinWriter`] for
266    /// pgwire to distribute raw byte chunks across the workers.
267    pub(crate) fn setup_copy_from_stdin(
268        &self,
269        session: &Session,
270        target_id: CatalogItemId,
271        target_name: String,
272        columns: Vec<ColumnIndex>,
273        row_desc: RelationDesc,
274        params: CopyFormatParams<'static>,
275    ) -> Result<CopyFromStdinWriter, AdapterError> {
276        // Look up the table and its persist shard metadata.
277        let Some(entry) = self.catalog().try_get_entry(&target_id) else {
278            return Err(AdapterError::ConcurrentDependencyDrop {
279                dependency_kind: "table",
280                dependency_id: target_id.to_string(),
281            });
282        };
283        let Some(dest_table) = entry.table() else {
284            let typ = entry.item().typ();
285            return Err(AdapterError::Unstructured(anyhow::anyhow!(
286                "programming error: expected a Table found {typ:?}"
287            )));
288        };
289        let collection_id = dest_table.global_id_writes();
290
291        let collection_meta = self
292            .controller
293            .storage
294            .collection_metadata(collection_id)
295            .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("{e}")))?;
296        let shard_id = collection_meta.data_shard;
297        let collection_desc = collection_meta.relation_desc.clone();
298
299        // Pre-compute the column transformation.
300        let pcx = session.pcx().clone();
301        let session_meta = session.meta();
302        let catalog = self.catalog().clone();
303        let conn_catalog = catalog.for_session(session);
304        let catalog_state = conn_catalog.state();
305        let optimizer_config = optimize::OptimizerConfig::from(conn_catalog.system_vars());
306
307        // Determine if we need column rewriting (defaults/reordering).
308        let target_desc = catalog
309            .try_get_entry(&target_id)
310            .expect("table must exist")
311            .relation_desc_latest()
312            .expect("table has desc")
313            .into_owned();
314        let all_columns_in_order = columns.len() == target_desc.arity()
315            && columns.iter().enumerate().all(|(i, c)| c.to_raw() == i);
316
317        // If we need column rewriting, pre-compute the transform by running
318        // plan_copy_from with a single dummy row through the optimizer.
319        let column_transform = if all_columns_in_order {
320            None
321        } else {
322            let dummy_datums: Vec<Datum> = columns.iter().map(|_| Datum::Null).collect();
323            let dummy_row = Row::pack(&dummy_datums);
324
325            let prep = ExprPrepOneShot {
326                logical_time: EvalTime::NotAvailable,
327                session: &session_meta,
328                catalog_state,
329            };
330            let mut optimizer = optimize::view::Optimizer::new_with_prep_no_limit(
331                optimizer_config.clone(),
332                None,
333                prep,
334            );
335
336            let hir = mz_sql::plan::plan_copy_from(
337                &pcx,
338                &conn_catalog,
339                target_id,
340                target_name.clone(),
341                columns.clone(),
342                vec![dummy_row],
343            )?;
344            let mir = optimize::Optimize::optimize(&mut optimizer, hir)?;
345            let mir_expr = mir.into_inner();
346            let (result_ref, _) = mir_expr
347                .as_const()
348                .expect("optimizer should produce constant");
349            let result_rows = result_ref
350                .clone()
351                .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("eval error: {e}")))?;
352
353            let (full_row, _) = result_rows.into_iter().next().expect("should have one row");
354            let full_datums: Vec<Datum> = full_row.unpack();
355
356            let col_to_source: std::collections::BTreeMap<ColumnIndex, usize> =
357                columns.iter().enumerate().map(|(a, b)| (*b, a)).collect();
358
359            let mut sources: Vec<ColumnSource> = Vec::with_capacity(target_desc.arity());
360            let mut default_datums: Vec<Datum> = Vec::new();
361
362            for i in 0..target_desc.arity() {
363                let col_idx = ColumnIndex::from_raw(i);
364                if let Some(&src_idx) = col_to_source.get(&col_idx) {
365                    sources.push(ColumnSource::Input(src_idx));
366                } else {
367                    sources.push(ColumnSource::Default(default_datums.len()));
368                    default_datums.push(full_datums[i]);
369                }
370            }
371
372            let defaults_row = Row::pack(&default_datums);
373
374            Some(ColumnTransform {
375                sources,
376                defaults_row,
377            })
378        };
379
380        // Compute column types for decoding (same logic as pgwire used to do).
381        let column_types: Arc<[mz_pgrepr::Type]> = row_desc
382            .typ()
383            .column_types
384            .iter()
385            .map(|x| &x.scalar_type)
386            .map(mz_pgrepr::Type::from)
387            .collect::<Vec<_>>()
388            .into();
389
390        // Determine number of parallel workers.
391        let num_workers = std::thread::available_parallelism()
392            .map(|n| n.get())
393            .unwrap_or(1);
394        tracing::info!(
395            %target_id, num_workers,
396            "starting parallel COPY FROM STDIN batch builders"
397        );
398
399        // Shared state across workers.
400        let column_transform = Arc::new(column_transform);
401        let target_desc = Arc::new(target_desc);
402        let collection_desc = Arc::new(collection_desc);
403        let persist_client = self.persist_client.clone();
404
405        // Create per-worker channels and spawn workers on blocking threads.
406        // Each worker does CPU-intensive TSV decoding + columnar encoding,
407        // so they need dedicated OS threads (not tokio async tasks) for
408        // true parallelism.
409        let rt_handle = tokio::runtime::Handle::current();
410        let mut batch_txs = Vec::with_capacity(num_workers);
411        let mut worker_handles = Vec::with_capacity(num_workers);
412
413        // When COPY FROM uses CSV with HEADER, only the very first chunk in
414        // the stream contains the real header line. The pgwire handler splits
415        // data into ~32MB chunks distributed round-robin across workers, so
416        // subsequent chunks' first rows are data, not headers. We must only
417        // skip the header on the first chunk of worker 0.
418        let first_chunk_has_header = params.requires_header();
419        let mut worker_params = params;
420        if let CopyFormatParams::Csv(ref mut csv) = worker_params {
421            csv.header = false;
422        }
423
424        for worker_id in 0..num_workers {
425            // Keep in-flight buffering tight: at most one chunk queued per
426            // worker in addition to the currently-processed chunk.
427            let (batch_tx, batch_rx) = mpsc::channel::<Vec<u8>>(1);
428            batch_txs.push(batch_tx);
429
430            let persist_client = persist_client.clone();
431            let column_types = Arc::clone(&column_types);
432            let column_transform = Arc::clone(&column_transform);
433            let target_desc = Arc::clone(&target_desc);
434            let collection_desc = Arc::clone(&collection_desc);
435            let params = worker_params.clone();
436            // Only worker 0 receives the first chunk (round-robin), so only
437            // it needs to skip the CSV header on its first chunk.
438            let skip_header_on_first_chunk = worker_id == 0 && first_chunk_has_header;
439            let rt = rt_handle.clone();
440
441            let handle = mz_ore::task::spawn_blocking(
442                || format!("copy_from_stdin_worker:{target_id}:{worker_id}"),
443                move || {
444                    rt.block_on(Self::copy_from_stdin_batch_builder(
445                        persist_client,
446                        shard_id,
447                        collection_id,
448                        collection_desc,
449                        target_desc,
450                        column_transform,
451                        column_types,
452                        params,
453                        skip_header_on_first_chunk,
454                        batch_rx,
455                    ))
456                },
457            );
458            worker_handles.push(handle);
459        }
460
461        // Spawn a collector task that waits for all workers.
462        let (completion_tx, completion_rx) = oneshot::channel();
463        mz_ore::task::spawn(
464            || format!("copy_from_stdin_collector:{target_id}"),
465            async move {
466                let mut all_batches = Vec::with_capacity(num_workers);
467                let mut total_rows: u64 = 0;
468
469                for handle in worker_handles {
470                    match handle.await {
471                        Ok((proto_batches, count)) => {
472                            all_batches.extend(proto_batches);
473                            total_rows += count;
474                        }
475                        Err(e) => {
476                            let _ = completion_tx.send(Err(e));
477                            return;
478                        }
479                    }
480                }
481
482                let _ = completion_tx.send(Ok((all_batches, total_rows)));
483            },
484        );
485
486        Ok(CopyFromStdinWriter {
487            batch_txs,
488            completion_rx,
489        })
490    }
491
492    /// Background task: receives raw byte chunks, decodes rows, and builds
493    /// persist batches. One instance runs per parallel worker.
494    async fn copy_from_stdin_batch_builder(
495        persist_client: mz_persist_client::PersistClient,
496        shard_id: mz_persist_client::ShardId,
497        collection_id: mz_repr::GlobalId,
498        collection_desc: Arc<RelationDesc>,
499        target_desc: Arc<RelationDesc>,
500        column_transform: Arc<Option<ColumnTransform>>,
501        column_types: Arc<[mz_pgrepr::Type]>,
502        params: CopyFormatParams<'static>,
503        skip_header_on_first_chunk: bool,
504        mut batch_rx: mpsc::Receiver<Vec<u8>>,
505    ) -> Result<(Vec<ProtoBatch>, u64), AdapterError> {
506        let persist_diagnostics = Diagnostics {
507            shard_name: collection_id.to_string(),
508            handle_purpose: "CopyFromStdin::batch_builder".to_string(),
509        };
510        let write_handle = persist_client
511            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
512                shard_id,
513                collection_desc,
514                Arc::new(UnitSchema),
515                persist_diagnostics,
516            )
517            .await
518            .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist open: {e}")))?;
519
520        // Build a batch at the minimum timestamp. The coordinator will
521        // re-timestamp it during commit.
522        let lower = mz_repr::Timestamp::MIN;
523        let upper = Antichain::from_elem(lower.step_forward());
524        let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
525        let mut row_count: u64 = 0;
526        let mut row_count_in_batch: u64 = 0;
527        let mut batch_bytes: usize = 0;
528        let mut proto_batches = Vec::new();
529
530        let mut is_first_chunk = true;
531        while let Some(raw_bytes) = batch_rx.recv().await {
532            // Decode raw bytes into rows. For the first chunk of worker 0,
533            // re-enable header skipping so the real CSV header line is skipped.
534            let chunk_params = if is_first_chunk && skip_header_on_first_chunk {
535                let mut p = params.clone();
536                if let CopyFormatParams::Csv(ref mut csv) = p {
537                    csv.header = true;
538                }
539                p
540            } else {
541                params.clone()
542            };
543            is_first_chunk = false;
544            let rows = mz_pgcopy::decode_copy_format(&raw_bytes, &column_types, chunk_params)
545                .map_err(|e| AdapterError::CopyFormatError(e.to_string()))?;
546
547            for row in rows {
548                // Apply column transform if needed (add defaults, reorder).
549                let full_row = if let Some(ref transform) = *column_transform {
550                    transform.apply(&row)
551                } else {
552                    row
553                };
554
555                // Check constraints.
556                for (i, datum) in full_row.iter().enumerate() {
557                    target_desc.constraints_met(i, &datum).map_err(|e| {
558                        AdapterError::Unstructured(anyhow::anyhow!("constraint violation: {e}"))
559                    })?;
560                }
561
562                let data = SourceData(Ok(full_row));
563                batch_builder
564                    .add(&data, &(), &lower, &1)
565                    .await
566                    .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist add: {e}")))?;
567                row_count += 1;
568                row_count_in_batch += 1;
569            }
570
571            batch_bytes = batch_bytes.saturating_add(raw_bytes.len());
572            if batch_bytes >= COPY_FROM_STDIN_MAX_BATCH_BYTES {
573                let batch = batch_builder.finish(upper.clone()).await.map_err(|e| {
574                    AdapterError::Unstructured(anyhow::anyhow!("persist finish: {e}"))
575                })?;
576                proto_batches.push(batch.into_transmittable_batch());
577
578                batch_builder = write_handle.builder(Antichain::from_elem(lower));
579                row_count_in_batch = 0;
580                batch_bytes = 0;
581            }
582        }
583
584        if row_count_in_batch > 0 || proto_batches.is_empty() {
585            let batch = batch_builder
586                .finish(upper)
587                .await
588                .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist finish: {e}")))?;
589            proto_batches.push(batch.into_transmittable_batch());
590        }
591
592        Ok((proto_batches, row_count))
593    }
594
595    pub(crate) fn commit_staged_batches(
596        &mut self,
597        conn_id: ConnectionId,
598        table_id: CatalogItemId,
599        batches: Vec<Result<ProtoBatch, String>>,
600    ) {
601        let Some(active_copy) = self.active_copies.remove(&conn_id) else {
602            // Getting a successful response for a cancel COPY FROM is unexpected.
603            tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
604            return;
605        };
606
607        let ActiveCopyFrom {
608            ingestion_id,
609            cluster_id: _,
610            table_id: _,
611            mut ctx,
612        } = active_copy;
613        tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
614
615        let mut all_batches = SmallVec::with_capacity(batches.len());
616        let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
617        let mut row_count = 0u64;
618
619        for maybe_batch in batches {
620            match maybe_batch {
621                Ok(batch) => {
622                    let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
623                    all_batches.push(batch);
624                    row_count = row_count.saturating_add(count);
625                }
626                Err(err) => all_errors.push(err),
627            }
628        }
629
630        // If we got any errors we need to fail the whole operation.
631        if let Some(error) = all_errors.pop() {
632            tracing::warn!(?error, ?all_errors, "failed COPY FROM");
633
634            // TODO(cf1): Cleanup the existing ProtoBatches to prevent leaking them.
635            // TODO(cf2): Carry structured errors all the way through.
636
637            ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
638                "COPY FROM: {error}"
639            ))));
640
641            return;
642        }
643
644        // Stage a WriteOp, then when the Session is retired we complete the
645        // transaction, which handles acquiring the write lock for `table_id`,
646        // advancing the timestamps of the staged batches, and waiting for
647        // everything to complete before sending a response to the client.
648        let stage_write = ctx
649            .session_mut()
650            .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
651                id: table_id,
652                rows: TableData::Batches(all_batches),
653            }]));
654
655        if let Err(err) = stage_write {
656            ctx.retire(Err(err));
657        } else {
658            ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
659        }
660    }
661
662    /// Cancel any active `COPY FROM` statements/oneshot ingestions.
663    #[mz_ore::instrument(level = "debug")]
664    pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
665        if let Some(ActiveCopyFrom {
666            ingestion_id,
667            cluster_id: _,
668            table_id: _,
669            ctx,
670        }) = self.active_copies.remove(conn_id)
671        {
672            let cancel_result = self
673                .controller
674                .storage
675                .cancel_oneshot_ingestion(ingestion_id);
676            if let Err(err) = cancel_result {
677                tracing::error!(?err, "failed to cancel OneshotIngestion");
678            }
679
680            ctx.retire(Err(AdapterError::Canceled));
681        }
682    }
683}
684
685/// Describes how to transform a partial row (with only specified columns)
686/// into a full row matching the table schema.
687struct ColumnTransform {
688    /// For each column in the target table, where to get the value.
689    sources: Vec<ColumnSource>,
690    /// Pre-computed default values for columns not in the COPY column list.
691    /// Packed as a Row; indexed by the `Default(idx)` variant.
692    defaults_row: Row,
693}
694
695enum ColumnSource {
696    /// Take the value from the input row at this position.
697    Input(usize),
698    /// Use the pre-computed default at this index in `defaults_row`.
699    Default(usize),
700}
701
702impl ColumnTransform {
703    /// Apply the transform to produce a full row from a partial input row.
704    fn apply(&self, input: &Row) -> Row {
705        let input_datums: Vec<Datum> = input.unpack();
706        let default_datums: Vec<Datum> = self.defaults_row.unpack();
707        let mut output_datums = Vec::with_capacity(self.sources.len());
708        for source in &self.sources {
709            match source {
710                ColumnSource::Input(idx) => output_datums.push(input_datums[*idx]),
711                ColumnSource::Default(idx) => output_datums.push(default_datums[*idx]),
712            }
713        }
714        Row::pack(&output_datums)
715    }
716}