Skip to main content

mz_adapter/coord/sequencer/inner/
copy_from.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::str::FromStr;
11use std::sync::Arc;
12
13use mz_adapter_types::connection::ConnectionId;
14use mz_expr::Eval;
15use mz_ore::cast::CastInto;
16use mz_persist_client::Diagnostics;
17use mz_persist_client::batch::ProtoBatch;
18use mz_persist_types::codec_impls::UnitSchema;
19use mz_pgcopy::CopyFormatParams;
20use mz_repr::{CatalogItemId, ColumnIndex, Datum, RelationDesc, Row, RowArena};
21use mz_sql::catalog::SessionCatalog;
22use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
23use mz_sql::session::metadata::SessionMetadata;
24use mz_storage_client::client::TableData;
25use mz_storage_types::StorageDiff;
26use mz_storage_types::oneshot_sources::{ContentShape, OneshotIngestionRequest};
27use mz_storage_types::sources::SourceData;
28use smallvec::SmallVec;
29use timely::progress::Antichain;
30use tokio::sync::{mpsc, oneshot};
31use url::Url;
32use uuid::Uuid;
33
34use crate::command::CopyFromStdinWriter;
35use crate::coord::sequencer::inner::return_if_err;
36use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
37use crate::optimize;
38use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
39use crate::session::{Session, TransactionOps, WriteOp};
40use crate::{AdapterError, ExecuteContext, ExecuteResponse};
41
42/// Finalize persist batches periodically during COPY FROM STDIN to avoid
43/// unbounded in-memory growth in a single giant batch.
44const COPY_FROM_STDIN_MAX_BATCH_BYTES: usize = 32 * 1024 * 1024;
45
46impl Coordinator {
47    pub(crate) async fn sequence_copy_from(
48        &mut self,
49        ctx: ExecuteContext,
50        plan: plan::CopyFromPlan,
51        target_cluster: TargetCluster,
52    ) {
53        let plan::CopyFromPlan {
54            target_name: _,
55            target_id,
56            source,
57            columns: _,
58            source_desc,
59            mfp,
60            params,
61            filter,
62        } = plan;
63
64        let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
65            let style = ExprPrepOneShot {
66                logical_time: EvalTime::NotAvailable,
67                session: ctx.session(),
68                catalog_state: self.catalog().state(),
69            };
70            let mut from = from.lower_uncorrelated(self.catalog().state().system_config())?;
71            style.prep_scalar_expr(&mut from)?;
72
73            // TODO(cf3): Add structured errors for the below uses of `coord_bail!`
74            // and AdapterError::Unstructured.
75            let temp_storage = RowArena::new();
76            let eval_result = from.eval(&[], &temp_storage)?;
77            let eval_string = match eval_result {
78                Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
79                Datum::String(url_str) => url_str,
80                other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
81            };
82
83            Ok(eval_string.to_string())
84        };
85
86        // We check in planning that we're copying into a Table, but be defensive.
87        let Some(entry) = self.catalog().try_get_entry(&target_id) else {
88            return ctx.retire(Err(AdapterError::ConcurrentDependencyDrop {
89                dependency_kind: "table",
90                dependency_id: target_id.to_string(),
91            }));
92        };
93        let Some(dest_table) = entry.table() else {
94            let typ = entry.item().typ();
95            let msg = format!("programming error: expected a Table found {typ:?}");
96            return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
97        };
98
99        // Generate a unique UUID for our ingestion.
100        let ingestion_id = Uuid::new_v4();
101        let collection_id = dest_table.global_id_writes();
102
103        let format = match params {
104            CopyFormatParams::Csv(csv) => {
105                mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
106            }
107            CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
108            CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
109                mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
110                ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL/S3 format")));
111                return;
112            }
113        };
114
115        let source = match source {
116            CopyFromSource::Url(from_expr) => {
117                let url = return_if_err!(eval_uri(from_expr), ctx);
118                // TODO(cf2): Structured errors.
119                let result = Url::parse(&url)
120                    .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
121                let url = return_if_err!(result, ctx);
122
123                // Only allow http(s) schemes. Technically we would fail later, as the current
124                // crate (reqwest) doesn't support other schemes.
125                // Prefer to fail early and explicitly in case the downstream ever changes.
126                // DNS resolution for hostnames is performed at execution time to avoid stalls
127                // during sequencing; IP-literal hosts are validated here because reqwest's
128                // custom DNS resolver is only invoked for hostnames.
129                match url.scheme() {
130                    "http" | "https" => {}
131                    other => {
132                        return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
133                            "only 'http://' and 'https://' urls are supported as COPY FROM \
134                             target, got '{other}://'"
135                        ))));
136                    }
137                }
138                let enforce_external_addresses =
139                    mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
140                        .get(self.controller.storage.config().config_set());
141                if enforce_external_addresses {
142                    if let Err(err) = mz_ore::netio::ensure_url_ip_global(&url) {
143                        return ctx
144                            .retire(Err(AdapterError::Unstructured(anyhow::anyhow!("{err}"))));
145                    }
146                }
147                mz_storage_types::oneshot_sources::ContentSource::Http { url }
148            }
149            CopyFromSource::AwsS3 {
150                uri,
151                connection,
152                connection_id,
153            } => {
154                let uri = return_if_err!(eval_uri(uri), ctx);
155
156                // Validate the URI is an S3 URI, with a bucket name. We rely on validating here
157                // and expect it in clusterd.
158                //
159                // TODO(cf2): Structured errors.
160                let result = http::Uri::from_str(&uri)
161                    .map_err(|err| {
162                        AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
163                    })
164                    .and_then(|uri| {
165                        if uri.scheme_str() != Some("s3") && uri.scheme_str() != Some("gs") {
166                            coord_bail!("only 's3://...' and 'gs://...' urls are supported as COPY FROM target");
167                        }
168                        Ok(uri)
169                    })
170                    .and_then(|uri| {
171                        if uri.host().is_none() {
172                            coord_bail!("missing bucket name from 's3://...' url");
173                        }
174                        Ok(uri)
175                    });
176                let uri = return_if_err!(result, ctx);
177
178                mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
179                    connection,
180                    connection_id,
181                    uri: uri.to_string(),
182                }
183            }
184            CopyFromSource::Stdin => {
185                unreachable!("COPY FROM STDIN should be handled elsewhere")
186            }
187        };
188
189        let filter = match filter {
190            None => mz_storage_types::oneshot_sources::ContentFilter::None,
191            Some(CopyFromFilter::Files(files)) => {
192                mz_storage_types::oneshot_sources::ContentFilter::Files(files)
193            }
194            Some(CopyFromFilter::Pattern(pattern)) => {
195                mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
196            }
197        };
198
199        let source_mfp = mfp
200            .into_plan()
201            .map_err(|s| AdapterError::internal("copy_from", s))
202            .and_then(|mfp| {
203                mfp.into_nontemporal().map_err(|_| {
204                    AdapterError::internal("copy_from", "temporal MFP not allowed in copy from")
205                })
206            });
207        let source_mfp = return_if_err!(source_mfp, ctx);
208
209        let shape = ContentShape {
210            source_desc,
211            source_mfp,
212        };
213
214        let request = OneshotIngestionRequest {
215            source,
216            format,
217            filter,
218            shape,
219        };
220
221        let target_cluster = match self
222            .catalog()
223            .resolve_target_cluster(target_cluster, ctx.session())
224        {
225            Ok(cluster) => cluster,
226            Err(err) => {
227                return ctx.retire(Err(err));
228            }
229        };
230        let cluster_id = target_cluster.id;
231
232        // When we finish staging the Batches in Persist, we'll send a command
233        // to the Coordinator.
234        let command_tx = self.internal_cmd_tx.clone();
235        let conn_id = ctx.session().conn_id().clone();
236        let closure = Box::new(move |batches| {
237            let _ = command_tx.send(crate::coord::Message::StagedBatches {
238                conn_id,
239                table_id: target_id,
240                batches,
241            });
242        });
243        // Stash the execute context so we can cancel the COPY.
244        let conn_id = ctx.session().conn_id().clone();
245        self.active_copies.insert(
246            conn_id,
247            ActiveCopyFrom {
248                ingestion_id,
249                cluster_id,
250                table_id: target_id,
251                ctx,
252            },
253        );
254
255        let _result = self
256            .controller
257            .storage
258            .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
259            .await;
260    }
261
262    /// Sets up a streaming COPY FROM STDIN operation.
263    ///
264    /// Spawns N parallel background batch builder tasks that each receive
265    /// raw byte chunks, decode them, apply column defaults/reordering,
266    /// and build persist batches. Returns a [`CopyFromStdinWriter`] for
267    /// pgwire to distribute raw byte chunks across the workers.
268    pub(crate) fn setup_copy_from_stdin(
269        &self,
270        session: &Session,
271        target_id: CatalogItemId,
272        target_name: String,
273        columns: Vec<ColumnIndex>,
274        row_desc: RelationDesc,
275        params: CopyFormatParams<'static>,
276    ) -> Result<CopyFromStdinWriter, AdapterError> {
277        // Look up the table and its persist shard metadata.
278        let Some(entry) = self.catalog().try_get_entry(&target_id) else {
279            return Err(AdapterError::ConcurrentDependencyDrop {
280                dependency_kind: "table",
281                dependency_id: target_id.to_string(),
282            });
283        };
284        let Some(dest_table) = entry.table() else {
285            let typ = entry.item().typ();
286            return Err(AdapterError::Unstructured(anyhow::anyhow!(
287                "programming error: expected a Table found {typ:?}"
288            )));
289        };
290        let collection_id = dest_table.global_id_writes();
291
292        let collection_meta = self
293            .controller
294            .storage
295            .collection_metadata(collection_id)
296            .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("{e}")))?;
297        let shard_id = collection_meta.data_shard;
298        let collection_desc = collection_meta.relation_desc.clone();
299
300        // Pre-compute the column transformation.
301        let pcx = session.pcx().clone();
302        let session_meta = session.meta();
303        let catalog = self.catalog().clone();
304        let conn_catalog = catalog.for_session(session);
305        let catalog_state = conn_catalog.state();
306        let optimizer_config = optimize::OptimizerConfig::from(conn_catalog.system_vars());
307
308        // Determine if we need column rewriting (defaults/reordering).
309        let target_desc = catalog
310            .try_get_entry(&target_id)
311            .expect("table must exist")
312            .relation_desc_latest()
313            .expect("table has desc")
314            .into_owned();
315        let all_columns_in_order = columns.len() == target_desc.arity()
316            && columns.iter().enumerate().all(|(i, c)| c.to_raw() == i);
317
318        // If we need column rewriting, pre-compute the transform by running
319        // plan_copy_from with a single dummy row through the optimizer.
320        let column_transform = if all_columns_in_order {
321            None
322        } else {
323            let dummy_datums: Vec<Datum> = columns.iter().map(|_| Datum::Null).collect();
324            let dummy_row = Row::pack(&dummy_datums);
325
326            let prep = ExprPrepOneShot {
327                logical_time: EvalTime::NotAvailable,
328                session: &session_meta,
329                catalog_state,
330            };
331            let mut optimizer = optimize::view::Optimizer::new_with_prep_no_limit(
332                optimizer_config.clone(),
333                None,
334                prep,
335            );
336
337            let hir = mz_sql::plan::plan_copy_from(
338                &pcx,
339                &conn_catalog,
340                target_id,
341                target_name.clone(),
342                columns.clone(),
343                vec![dummy_row],
344            )?;
345            let mir = optimize::Optimize::optimize(&mut optimizer, hir)?;
346            let mir_expr = mir.into_inner();
347            let (result_ref, _) = mir_expr
348                .as_const()
349                .expect("optimizer should produce constant");
350            let result_rows = result_ref
351                .clone()
352                .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("eval error: {e}")))?;
353
354            let (full_row, _) = result_rows.into_iter().next().expect("should have one row");
355            let full_datums: Vec<Datum> = full_row.unpack();
356
357            let col_to_source: std::collections::BTreeMap<ColumnIndex, usize> =
358                columns.iter().enumerate().map(|(a, b)| (*b, a)).collect();
359
360            let mut sources: Vec<ColumnSource> = Vec::with_capacity(target_desc.arity());
361            let mut default_datums: Vec<Datum> = Vec::new();
362
363            for i in 0..target_desc.arity() {
364                let col_idx = ColumnIndex::from_raw(i);
365                if let Some(&src_idx) = col_to_source.get(&col_idx) {
366                    sources.push(ColumnSource::Input(src_idx));
367                } else {
368                    sources.push(ColumnSource::Default(default_datums.len()));
369                    default_datums.push(full_datums[i]);
370                }
371            }
372
373            let defaults_row = Row::pack(&default_datums);
374
375            Some(ColumnTransform {
376                sources,
377                defaults_row,
378            })
379        };
380
381        // Compute column types for decoding (same logic as pgwire used to do).
382        let column_types: Arc<[mz_pgrepr::Type]> = row_desc
383            .typ()
384            .column_types
385            .iter()
386            .map(|x| &x.scalar_type)
387            .map(mz_pgrepr::Type::from)
388            .collect::<Vec<_>>()
389            .into();
390
391        // Determine number of parallel workers.
392        let num_workers = std::thread::available_parallelism()
393            .map(|n| n.get())
394            .unwrap_or(1);
395        tracing::info!(
396            %target_id, num_workers,
397            "starting parallel COPY FROM STDIN batch builders"
398        );
399
400        // Shared state across workers.
401        let column_transform = Arc::new(column_transform);
402        let target_desc = Arc::new(target_desc);
403        let collection_desc = Arc::new(collection_desc);
404        let persist_client = self.persist_client.clone();
405
406        // Create per-worker channels and spawn workers on blocking threads.
407        // Each worker does CPU-intensive TSV decoding + columnar encoding,
408        // so they need dedicated OS threads (not tokio async tasks) for
409        // true parallelism.
410        let rt_handle = tokio::runtime::Handle::current();
411        let mut batch_txs = Vec::with_capacity(num_workers);
412        let mut worker_handles = Vec::with_capacity(num_workers);
413
414        // When COPY FROM uses CSV with HEADER, only the very first chunk in
415        // the stream contains the real header line. The pgwire handler splits
416        // data into ~32MB chunks distributed round-robin across workers, so
417        // subsequent chunks' first rows are data, not headers. We must only
418        // skip the header on the first chunk of worker 0.
419        let first_chunk_has_header = params.requires_header();
420        let mut worker_params = params;
421        if let CopyFormatParams::Csv(ref mut csv) = worker_params {
422            csv.header = false;
423        }
424
425        for worker_id in 0..num_workers {
426            // Keep in-flight buffering tight: at most one chunk queued per
427            // worker in addition to the currently-processed chunk.
428            let (batch_tx, batch_rx) = mpsc::channel::<Vec<u8>>(1);
429            batch_txs.push(batch_tx);
430
431            let persist_client = persist_client.clone();
432            let column_types = Arc::clone(&column_types);
433            let column_transform = Arc::clone(&column_transform);
434            let target_desc = Arc::clone(&target_desc);
435            let collection_desc = Arc::clone(&collection_desc);
436            let params = worker_params.clone();
437            // Only worker 0 receives the first chunk (round-robin), so only
438            // it needs to skip the CSV header on its first chunk.
439            let skip_header_on_first_chunk = worker_id == 0 && first_chunk_has_header;
440            let rt = rt_handle.clone();
441
442            let handle = mz_ore::task::spawn_blocking(
443                || format!("copy_from_stdin_worker:{target_id}:{worker_id}"),
444                move || {
445                    rt.block_on(Self::copy_from_stdin_batch_builder(
446                        persist_client,
447                        shard_id,
448                        collection_id,
449                        collection_desc,
450                        target_desc,
451                        column_transform,
452                        column_types,
453                        params,
454                        skip_header_on_first_chunk,
455                        batch_rx,
456                    ))
457                },
458            );
459            worker_handles.push(handle);
460        }
461
462        // Spawn a collector task that waits for all workers.
463        let (completion_tx, completion_rx) = oneshot::channel();
464        mz_ore::task::spawn(
465            || format!("copy_from_stdin_collector:{target_id}"),
466            async move {
467                let mut all_batches = Vec::with_capacity(num_workers);
468                let mut total_rows: u64 = 0;
469
470                for handle in worker_handles {
471                    match handle.await {
472                        Ok((proto_batches, count)) => {
473                            all_batches.extend(proto_batches);
474                            total_rows += count;
475                        }
476                        Err(e) => {
477                            let _ = completion_tx.send(Err(e));
478                            return;
479                        }
480                    }
481                }
482
483                let _ = completion_tx.send(Ok((all_batches, total_rows)));
484            },
485        );
486
487        Ok(CopyFromStdinWriter {
488            batch_txs,
489            completion_rx,
490        })
491    }
492
493    /// Background task: receives raw byte chunks, decodes rows, and builds
494    /// persist batches. One instance runs per parallel worker.
495    async fn copy_from_stdin_batch_builder(
496        persist_client: mz_persist_client::PersistClient,
497        shard_id: mz_persist_client::ShardId,
498        collection_id: mz_repr::GlobalId,
499        collection_desc: Arc<RelationDesc>,
500        target_desc: Arc<RelationDesc>,
501        column_transform: Arc<Option<ColumnTransform>>,
502        column_types: Arc<[mz_pgrepr::Type]>,
503        params: CopyFormatParams<'static>,
504        skip_header_on_first_chunk: bool,
505        mut batch_rx: mpsc::Receiver<Vec<u8>>,
506    ) -> Result<(Vec<ProtoBatch>, u64), AdapterError> {
507        let persist_diagnostics = Diagnostics {
508            shard_name: collection_id.to_string(),
509            handle_purpose: "CopyFromStdin::batch_builder".to_string(),
510        };
511        let write_handle = persist_client
512            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
513                shard_id,
514                collection_desc,
515                Arc::new(UnitSchema),
516                persist_diagnostics,
517            )
518            .await
519            .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist open: {e}")))?;
520
521        // Build a batch at the minimum timestamp. The coordinator will
522        // re-timestamp it during commit.
523        let lower = mz_repr::Timestamp::MIN;
524        let upper = Antichain::from_elem(lower.step_forward());
525        let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
526        let mut row_count: u64 = 0;
527        let mut row_count_in_batch: u64 = 0;
528        let mut batch_bytes: usize = 0;
529        let mut proto_batches = Vec::new();
530
531        let mut is_first_chunk = true;
532        while let Some(raw_bytes) = batch_rx.recv().await {
533            // Decode raw bytes into rows. For the first chunk of worker 0,
534            // re-enable header skipping so the real CSV header line is skipped.
535            let chunk_params = if is_first_chunk && skip_header_on_first_chunk {
536                let mut p = params.clone();
537                if let CopyFormatParams::Csv(ref mut csv) = p {
538                    csv.header = true;
539                }
540                p
541            } else {
542                params.clone()
543            };
544            is_first_chunk = false;
545            let rows = mz_pgcopy::decode_copy_format(&raw_bytes, &column_types, chunk_params)
546                .map_err(|e| AdapterError::CopyFormatError(e.to_string()))?;
547
548            for row in rows {
549                // Apply column transform if needed (add defaults, reorder).
550                let full_row = if let Some(ref transform) = *column_transform {
551                    transform.apply(&row)
552                } else {
553                    row
554                };
555
556                // Check constraints.
557                for (i, datum) in full_row.iter().enumerate() {
558                    target_desc.constraints_met(i, &datum).map_err(|e| {
559                        AdapterError::Unstructured(anyhow::anyhow!("constraint violation: {e}"))
560                    })?;
561                }
562
563                let data = SourceData(Ok(full_row));
564                batch_builder
565                    .add(&data, &(), &lower, &1)
566                    .await
567                    .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist add: {e}")))?;
568                row_count += 1;
569                row_count_in_batch += 1;
570            }
571
572            batch_bytes = batch_bytes.saturating_add(raw_bytes.len());
573            if batch_bytes >= COPY_FROM_STDIN_MAX_BATCH_BYTES {
574                let batch = batch_builder.finish(upper.clone()).await.map_err(|e| {
575                    AdapterError::Unstructured(anyhow::anyhow!("persist finish: {e}"))
576                })?;
577                proto_batches.push(batch.into_transmittable_batch());
578
579                batch_builder = write_handle.builder(Antichain::from_elem(lower));
580                row_count_in_batch = 0;
581                batch_bytes = 0;
582            }
583        }
584
585        if row_count_in_batch > 0 || proto_batches.is_empty() {
586            let batch = batch_builder
587                .finish(upper)
588                .await
589                .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist finish: {e}")))?;
590            proto_batches.push(batch.into_transmittable_batch());
591        }
592
593        Ok((proto_batches, row_count))
594    }
595
596    pub(crate) fn commit_staged_batches(
597        &mut self,
598        conn_id: ConnectionId,
599        table_id: CatalogItemId,
600        batches: Vec<Result<ProtoBatch, String>>,
601    ) {
602        let Some(active_copy) = self.active_copies.remove(&conn_id) else {
603            // Getting a successful response for a cancel COPY FROM is unexpected.
604            tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
605            return;
606        };
607
608        let ActiveCopyFrom {
609            ingestion_id,
610            cluster_id: _,
611            table_id: _,
612            mut ctx,
613        } = active_copy;
614        tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
615
616        let mut all_batches = SmallVec::with_capacity(batches.len());
617        let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
618        let mut row_count = 0u64;
619
620        for maybe_batch in batches {
621            match maybe_batch {
622                Ok(batch) => {
623                    let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
624                    all_batches.push(batch);
625                    row_count = row_count.saturating_add(count);
626                }
627                Err(err) => all_errors.push(err),
628            }
629        }
630
631        // If we got any errors we need to fail the whole operation.
632        if let Some(error) = all_errors.pop() {
633            tracing::warn!(?error, ?all_errors, "failed COPY FROM");
634
635            // TODO(cf1): Cleanup the existing ProtoBatches to prevent leaking them.
636            // TODO(cf2): Carry structured errors all the way through.
637
638            ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
639                "COPY FROM: {error}"
640            ))));
641
642            return;
643        }
644
645        // Stage a WriteOp, then when the Session is retired we complete the
646        // transaction, which handles acquiring the write lock for `table_id`,
647        // advancing the timestamps of the staged batches, and waiting for
648        // everything to complete before sending a response to the client.
649        let stage_write = ctx
650            .session_mut()
651            .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
652                id: table_id,
653                rows: TableData::Batches(all_batches),
654            }]));
655
656        if let Err(err) = stage_write {
657            ctx.retire(Err(err));
658        } else {
659            ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
660        }
661    }
662
663    /// Cancel any active `COPY FROM` statements/oneshot ingestions.
664    #[mz_ore::instrument(level = "debug")]
665    pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
666        if let Some(ActiveCopyFrom {
667            ingestion_id,
668            cluster_id: _,
669            table_id: _,
670            ctx,
671        }) = self.active_copies.remove(conn_id)
672        {
673            let cancel_result = self
674                .controller
675                .storage
676                .cancel_oneshot_ingestion(ingestion_id);
677            if let Err(err) = cancel_result {
678                tracing::error!(?err, "failed to cancel OneshotIngestion");
679            }
680
681            ctx.retire(Err(AdapterError::Canceled));
682        }
683    }
684}
685
686/// Describes how to transform a partial row (with only specified columns)
687/// into a full row matching the table schema.
688struct ColumnTransform {
689    /// For each column in the target table, where to get the value.
690    sources: Vec<ColumnSource>,
691    /// Pre-computed default values for columns not in the COPY column list.
692    /// Packed as a Row; indexed by the `Default(idx)` variant.
693    defaults_row: Row,
694}
695
696enum ColumnSource {
697    /// Take the value from the input row at this position.
698    Input(usize),
699    /// Use the pre-computed default at this index in `defaults_row`.
700    Default(usize),
701}
702
703impl ColumnTransform {
704    /// Apply the transform to produce a full row from a partial input row.
705    fn apply(&self, input: &Row) -> Row {
706        let input_datums: Vec<Datum> = input.unpack();
707        let default_datums: Vec<Datum> = self.defaults_row.unpack();
708        let mut output_datums = Vec::with_capacity(self.sources.len());
709        for source in &self.sources {
710            match source {
711                ColumnSource::Input(idx) => output_datums.push(input_datums[*idx]),
712                ColumnSource::Default(idx) => output_datums.push(default_datums[*idx]),
713            }
714        }
715        Row::pack(&output_datums)
716    }
717}