Skip to main content

mz_adapter/coord/sequencer/inner/
copy_from.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::str::FromStr;
11use std::sync::Arc;
12
13use mz_adapter_types::connection::ConnectionId;
14use mz_ore::cast::CastInto;
15use mz_persist_client::Diagnostics;
16use mz_persist_client::batch::ProtoBatch;
17use mz_persist_types::codec_impls::UnitSchema;
18use mz_pgcopy::CopyFormatParams;
19use mz_repr::{CatalogItemId, ColumnIndex, Datum, RelationDesc, Row, RowArena};
20use mz_sql::catalog::SessionCatalog;
21use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
22use mz_sql::session::metadata::SessionMetadata;
23use mz_storage_client::client::TableData;
24use mz_storage_types::StorageDiff;
25use mz_storage_types::oneshot_sources::{ContentShape, OneshotIngestionRequest};
26use mz_storage_types::sources::SourceData;
27use smallvec::SmallVec;
28use timely::progress::Antichain;
29use tokio::sync::{mpsc, oneshot};
30use url::Url;
31use uuid::Uuid;
32
33use crate::command::CopyFromStdinWriter;
34use crate::coord::sequencer::inner::return_if_err;
35use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
36use crate::optimize;
37use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
38use crate::session::{Session, TransactionOps, WriteOp};
39use crate::{AdapterError, ExecuteContext, ExecuteResponse};
40
41/// Finalize persist batches periodically during COPY FROM STDIN to avoid
42/// unbounded in-memory growth in a single giant batch.
43const COPY_FROM_STDIN_MAX_BATCH_BYTES: usize = 32 * 1024 * 1024;
44
45impl Coordinator {
46    pub(crate) async fn sequence_copy_from(
47        &mut self,
48        ctx: ExecuteContext,
49        plan: plan::CopyFromPlan,
50        target_cluster: TargetCluster,
51    ) {
52        let plan::CopyFromPlan {
53            target_name: _,
54            target_id,
55            source,
56            columns: _,
57            source_desc,
58            mfp,
59            params,
60            filter,
61        } = plan;
62
63        let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
64            let style = ExprPrepOneShot {
65                logical_time: EvalTime::NotAvailable,
66                session: ctx.session(),
67                catalog_state: self.catalog().state(),
68            };
69            let mut from = from.lower_uncorrelated(self.catalog().state().system_config())?;
70            style.prep_scalar_expr(&mut from)?;
71
72            // TODO(cf3): Add structured errors for the below uses of `coord_bail!`
73            // and AdapterError::Unstructured.
74            let temp_storage = RowArena::new();
75            let eval_result = from.eval(&[], &temp_storage)?;
76            let eval_string = match eval_result {
77                Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
78                Datum::String(url_str) => url_str,
79                other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
80            };
81
82            Ok(eval_string.to_string())
83        };
84
85        // We check in planning that we're copying into a Table, but be defensive.
86        let Some(entry) = self.catalog().try_get_entry(&target_id) else {
87            return ctx.retire(Err(AdapterError::ConcurrentDependencyDrop {
88                dependency_kind: "table",
89                dependency_id: target_id.to_string(),
90            }));
91        };
92        let Some(dest_table) = entry.table() else {
93            let typ = entry.item().typ();
94            let msg = format!("programming error: expected a Table found {typ:?}");
95            return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
96        };
97
98        // Generate a unique UUID for our ingestion.
99        let ingestion_id = Uuid::new_v4();
100        let collection_id = dest_table.global_id_writes();
101
102        let format = match params {
103            CopyFormatParams::Csv(csv) => {
104                mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
105            }
106            CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
107            CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
108                mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
109                ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL/S3 format")));
110                return;
111            }
112        };
113
114        let source = match source {
115            CopyFromSource::Url(from_expr) => {
116                let url = return_if_err!(eval_uri(from_expr), ctx);
117                // TODO(cf2): Structured errors.
118                let result = Url::parse(&url)
119                    .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
120                let url = return_if_err!(result, ctx);
121
122                mz_storage_types::oneshot_sources::ContentSource::Http { url }
123            }
124            CopyFromSource::AwsS3 {
125                uri,
126                connection,
127                connection_id,
128            } => {
129                let uri = return_if_err!(eval_uri(uri), ctx);
130
131                // Validate the URI is an S3 URI, with a bucket name. We rely on validating here
132                // and expect it in clusterd.
133                //
134                // TODO(cf2): Structured errors.
135                let result = http::Uri::from_str(&uri)
136                    .map_err(|err| {
137                        AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
138                    })
139                    .and_then(|uri| {
140                        if uri.scheme_str() != Some("s3") {
141                            coord_bail!("only 's3://...' urls are supported as COPY FROM target");
142                        }
143                        Ok(uri)
144                    })
145                    .and_then(|uri| {
146                        if uri.host().is_none() {
147                            coord_bail!("missing bucket name from 's3://...' url");
148                        }
149                        Ok(uri)
150                    });
151                let uri = return_if_err!(result, ctx);
152
153                mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
154                    connection,
155                    connection_id,
156                    uri: uri.to_string(),
157                }
158            }
159            CopyFromSource::Stdin => {
160                unreachable!("COPY FROM STDIN should be handled elsewhere")
161            }
162        };
163
164        let filter = match filter {
165            None => mz_storage_types::oneshot_sources::ContentFilter::None,
166            Some(CopyFromFilter::Files(files)) => {
167                mz_storage_types::oneshot_sources::ContentFilter::Files(files)
168            }
169            Some(CopyFromFilter::Pattern(pattern)) => {
170                mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
171            }
172        };
173
174        let source_mfp = mfp
175            .into_plan()
176            .map_err(|s| AdapterError::internal("copy_from", s))
177            .and_then(|mfp| {
178                mfp.into_nontemporal().map_err(|_| {
179                    AdapterError::internal("copy_from", "temporal MFP not allowed in copy from")
180                })
181            });
182        let source_mfp = return_if_err!(source_mfp, ctx);
183
184        let shape = ContentShape {
185            source_desc,
186            source_mfp,
187        };
188
189        let request = OneshotIngestionRequest {
190            source,
191            format,
192            filter,
193            shape,
194        };
195
196        let target_cluster = match self
197            .catalog()
198            .resolve_target_cluster(target_cluster, ctx.session())
199        {
200            Ok(cluster) => cluster,
201            Err(err) => {
202                return ctx.retire(Err(err));
203            }
204        };
205        let cluster_id = target_cluster.id;
206
207        // When we finish staging the Batches in Persist, we'll send a command
208        // to the Coordinator.
209        let command_tx = self.internal_cmd_tx.clone();
210        let conn_id = ctx.session().conn_id().clone();
211        let closure = Box::new(move |batches| {
212            let _ = command_tx.send(crate::coord::Message::StagedBatches {
213                conn_id,
214                table_id: target_id,
215                batches,
216            });
217        });
218        // Stash the execute context so we can cancel the COPY.
219        let conn_id = ctx.session().conn_id().clone();
220        self.active_copies.insert(
221            conn_id,
222            ActiveCopyFrom {
223                ingestion_id,
224                cluster_id,
225                table_id: target_id,
226                ctx,
227            },
228        );
229
230        let _result = self
231            .controller
232            .storage
233            .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
234            .await;
235    }
236
237    /// Sets up a streaming COPY FROM STDIN operation.
238    ///
239    /// Spawns N parallel background batch builder tasks that each receive
240    /// raw byte chunks, decode them, apply column defaults/reordering,
241    /// and build persist batches. Returns a [`CopyFromStdinWriter`] for
242    /// pgwire to distribute raw byte chunks across the workers.
243    pub(crate) fn setup_copy_from_stdin(
244        &self,
245        session: &Session,
246        target_id: CatalogItemId,
247        target_name: String,
248        columns: Vec<ColumnIndex>,
249        row_desc: RelationDesc,
250        params: CopyFormatParams<'static>,
251    ) -> Result<CopyFromStdinWriter, AdapterError> {
252        // Look up the table and its persist shard metadata.
253        let Some(entry) = self.catalog().try_get_entry(&target_id) else {
254            return Err(AdapterError::ConcurrentDependencyDrop {
255                dependency_kind: "table",
256                dependency_id: target_id.to_string(),
257            });
258        };
259        let Some(dest_table) = entry.table() else {
260            let typ = entry.item().typ();
261            return Err(AdapterError::Unstructured(anyhow::anyhow!(
262                "programming error: expected a Table found {typ:?}"
263            )));
264        };
265        let collection_id = dest_table.global_id_writes();
266
267        let collection_meta = self
268            .controller
269            .storage
270            .collection_metadata(collection_id)
271            .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("{e}")))?;
272        let shard_id = collection_meta.data_shard;
273        let collection_desc = collection_meta.relation_desc.clone();
274
275        // Pre-compute the column transformation.
276        let pcx = session.pcx().clone();
277        let session_meta = session.meta();
278        let catalog = self.catalog().clone();
279        let conn_catalog = catalog.for_session(session);
280        let catalog_state = conn_catalog.state();
281        let optimizer_config = optimize::OptimizerConfig::from(conn_catalog.system_vars());
282
283        // Determine if we need column rewriting (defaults/reordering).
284        let target_desc = catalog
285            .try_get_entry(&target_id)
286            .expect("table must exist")
287            .relation_desc_latest()
288            .expect("table has desc")
289            .into_owned();
290        let all_columns_in_order = columns.len() == target_desc.arity()
291            && columns.iter().enumerate().all(|(i, c)| c.to_raw() == i);
292
293        // If we need column rewriting, pre-compute the transform by running
294        // plan_copy_from with a single dummy row through the optimizer.
295        let column_transform = if all_columns_in_order {
296            None
297        } else {
298            let dummy_datums: Vec<Datum> = columns.iter().map(|_| Datum::Null).collect();
299            let dummy_row = Row::pack(&dummy_datums);
300
301            let prep = ExprPrepOneShot {
302                logical_time: EvalTime::NotAvailable,
303                session: &session_meta,
304                catalog_state,
305            };
306            let mut optimizer = optimize::view::Optimizer::new_with_prep_no_limit(
307                optimizer_config.clone(),
308                None,
309                prep,
310            );
311
312            let hir = mz_sql::plan::plan_copy_from(
313                &pcx,
314                &conn_catalog,
315                target_id,
316                target_name.clone(),
317                columns.clone(),
318                vec![dummy_row],
319            )?;
320            let mir = optimize::Optimize::optimize(&mut optimizer, hir)?;
321            let mir_expr = mir.into_inner();
322            let (result_ref, _) = mir_expr
323                .as_const()
324                .expect("optimizer should produce constant");
325            let result_rows = result_ref
326                .clone()
327                .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("eval error: {e}")))?;
328
329            let (full_row, _) = result_rows.into_iter().next().expect("should have one row");
330            let full_datums: Vec<Datum> = full_row.unpack();
331
332            let col_to_source: std::collections::BTreeMap<ColumnIndex, usize> =
333                columns.iter().enumerate().map(|(a, b)| (*b, a)).collect();
334
335            let mut sources: Vec<ColumnSource> = Vec::with_capacity(target_desc.arity());
336            let mut default_datums: Vec<Datum> = Vec::new();
337
338            for i in 0..target_desc.arity() {
339                let col_idx = ColumnIndex::from_raw(i);
340                if let Some(&src_idx) = col_to_source.get(&col_idx) {
341                    sources.push(ColumnSource::Input(src_idx));
342                } else {
343                    sources.push(ColumnSource::Default(default_datums.len()));
344                    default_datums.push(full_datums[i]);
345                }
346            }
347
348            let defaults_row = Row::pack(&default_datums);
349
350            Some(ColumnTransform {
351                sources,
352                defaults_row,
353            })
354        };
355
356        // Compute column types for decoding (same logic as pgwire used to do).
357        let column_types: Arc<[mz_pgrepr::Type]> = row_desc
358            .typ()
359            .column_types
360            .iter()
361            .map(|x| &x.scalar_type)
362            .map(mz_pgrepr::Type::from)
363            .collect::<Vec<_>>()
364            .into();
365
366        // Determine number of parallel workers.
367        let num_workers = std::thread::available_parallelism()
368            .map(|n| n.get())
369            .unwrap_or(1);
370        tracing::info!(
371            %target_id, num_workers,
372            "starting parallel COPY FROM STDIN batch builders"
373        );
374
375        // Shared state across workers.
376        let column_transform = Arc::new(column_transform);
377        let target_desc = Arc::new(target_desc);
378        let collection_desc = Arc::new(collection_desc);
379        let persist_client = self.persist_client.clone();
380
381        // Create per-worker channels and spawn workers on blocking threads.
382        // Each worker does CPU-intensive TSV decoding + columnar encoding,
383        // so they need dedicated OS threads (not tokio async tasks) for
384        // true parallelism.
385        let rt_handle = tokio::runtime::Handle::current();
386        let mut batch_txs = Vec::with_capacity(num_workers);
387        let mut worker_handles = Vec::with_capacity(num_workers);
388
389        // When COPY FROM uses CSV with HEADER, only the very first chunk in
390        // the stream contains the real header line. The pgwire handler splits
391        // data into ~32MB chunks distributed round-robin across workers, so
392        // subsequent chunks' first rows are data, not headers. We must only
393        // skip the header on the first chunk of worker 0.
394        let first_chunk_has_header = params.requires_header();
395        let mut worker_params = params;
396        if let CopyFormatParams::Csv(ref mut csv) = worker_params {
397            csv.header = false;
398        }
399
400        for worker_id in 0..num_workers {
401            // Keep in-flight buffering tight: at most one chunk queued per
402            // worker in addition to the currently-processed chunk.
403            let (batch_tx, batch_rx) = mpsc::channel::<Vec<u8>>(1);
404            batch_txs.push(batch_tx);
405
406            let persist_client = persist_client.clone();
407            let column_types = Arc::clone(&column_types);
408            let column_transform = Arc::clone(&column_transform);
409            let target_desc = Arc::clone(&target_desc);
410            let collection_desc = Arc::clone(&collection_desc);
411            let params = worker_params.clone();
412            // Only worker 0 receives the first chunk (round-robin), so only
413            // it needs to skip the CSV header on its first chunk.
414            let skip_header_on_first_chunk = worker_id == 0 && first_chunk_has_header;
415            let rt = rt_handle.clone();
416
417            let handle = mz_ore::task::spawn_blocking(
418                || format!("copy_from_stdin_worker:{target_id}:{worker_id}"),
419                move || {
420                    rt.block_on(Self::copy_from_stdin_batch_builder(
421                        persist_client,
422                        shard_id,
423                        collection_id,
424                        collection_desc,
425                        target_desc,
426                        column_transform,
427                        column_types,
428                        params,
429                        skip_header_on_first_chunk,
430                        batch_rx,
431                    ))
432                },
433            );
434            worker_handles.push(handle);
435        }
436
437        // Spawn a collector task that waits for all workers.
438        let (completion_tx, completion_rx) = oneshot::channel();
439        mz_ore::task::spawn(
440            || format!("copy_from_stdin_collector:{target_id}"),
441            async move {
442                let mut all_batches = Vec::with_capacity(num_workers);
443                let mut total_rows: u64 = 0;
444
445                for handle in worker_handles {
446                    match handle.await {
447                        Ok((proto_batches, count)) => {
448                            all_batches.extend(proto_batches);
449                            total_rows += count;
450                        }
451                        Err(e) => {
452                            let _ = completion_tx.send(Err(e));
453                            return;
454                        }
455                    }
456                }
457
458                let _ = completion_tx.send(Ok((all_batches, total_rows)));
459            },
460        );
461
462        Ok(CopyFromStdinWriter {
463            batch_txs,
464            completion_rx,
465        })
466    }
467
468    /// Background task: receives raw byte chunks, decodes rows, and builds
469    /// persist batches. One instance runs per parallel worker.
470    async fn copy_from_stdin_batch_builder(
471        persist_client: mz_persist_client::PersistClient,
472        shard_id: mz_persist_client::ShardId,
473        collection_id: mz_repr::GlobalId,
474        collection_desc: Arc<RelationDesc>,
475        target_desc: Arc<RelationDesc>,
476        column_transform: Arc<Option<ColumnTransform>>,
477        column_types: Arc<[mz_pgrepr::Type]>,
478        params: CopyFormatParams<'static>,
479        skip_header_on_first_chunk: bool,
480        mut batch_rx: mpsc::Receiver<Vec<u8>>,
481    ) -> Result<(Vec<ProtoBatch>, u64), AdapterError> {
482        let persist_diagnostics = Diagnostics {
483            shard_name: collection_id.to_string(),
484            handle_purpose: "CopyFromStdin::batch_builder".to_string(),
485        };
486        let write_handle = persist_client
487            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
488                shard_id,
489                collection_desc,
490                Arc::new(UnitSchema),
491                persist_diagnostics,
492            )
493            .await
494            .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist open: {e}")))?;
495
496        // Build a batch at the minimum timestamp. The coordinator will
497        // re-timestamp it during commit.
498        let lower = mz_repr::Timestamp::MIN;
499        let upper = Antichain::from_elem(lower.step_forward());
500        let mut batch_builder = write_handle.builder(Antichain::from_elem(lower));
501        let mut row_count: u64 = 0;
502        let mut row_count_in_batch: u64 = 0;
503        let mut batch_bytes: usize = 0;
504        let mut proto_batches = Vec::new();
505
506        let mut is_first_chunk = true;
507        while let Some(raw_bytes) = batch_rx.recv().await {
508            // Decode raw bytes into rows. For the first chunk of worker 0,
509            // re-enable header skipping so the real CSV header line is skipped.
510            let chunk_params = if is_first_chunk && skip_header_on_first_chunk {
511                let mut p = params.clone();
512                if let CopyFormatParams::Csv(ref mut csv) = p {
513                    csv.header = true;
514                }
515                p
516            } else {
517                params.clone()
518            };
519            is_first_chunk = false;
520            let rows = mz_pgcopy::decode_copy_format(&raw_bytes, &column_types, chunk_params)
521                .map_err(|e| AdapterError::CopyFormatError(e.to_string()))?;
522
523            for row in rows {
524                // Apply column transform if needed (add defaults, reorder).
525                let full_row = if let Some(ref transform) = *column_transform {
526                    transform.apply(&row)
527                } else {
528                    row
529                };
530
531                // Check constraints.
532                for (i, datum) in full_row.iter().enumerate() {
533                    target_desc.constraints_met(i, &datum).map_err(|e| {
534                        AdapterError::Unstructured(anyhow::anyhow!("constraint violation: {e}"))
535                    })?;
536                }
537
538                let data = SourceData(Ok(full_row));
539                batch_builder
540                    .add(&data, &(), &lower, &1)
541                    .await
542                    .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist add: {e}")))?;
543                row_count += 1;
544                row_count_in_batch += 1;
545            }
546
547            batch_bytes = batch_bytes.saturating_add(raw_bytes.len());
548            if batch_bytes >= COPY_FROM_STDIN_MAX_BATCH_BYTES {
549                let batch = batch_builder.finish(upper.clone()).await.map_err(|e| {
550                    AdapterError::Unstructured(anyhow::anyhow!("persist finish: {e}"))
551                })?;
552                proto_batches.push(batch.into_transmittable_batch());
553
554                batch_builder = write_handle.builder(Antichain::from_elem(lower));
555                row_count_in_batch = 0;
556                batch_bytes = 0;
557            }
558        }
559
560        if row_count_in_batch > 0 || proto_batches.is_empty() {
561            let batch = batch_builder
562                .finish(upper)
563                .await
564                .map_err(|e| AdapterError::Unstructured(anyhow::anyhow!("persist finish: {e}")))?;
565            proto_batches.push(batch.into_transmittable_batch());
566        }
567
568        Ok((proto_batches, row_count))
569    }
570
571    pub(crate) fn commit_staged_batches(
572        &mut self,
573        conn_id: ConnectionId,
574        table_id: CatalogItemId,
575        batches: Vec<Result<ProtoBatch, String>>,
576    ) {
577        let Some(active_copy) = self.active_copies.remove(&conn_id) else {
578            // Getting a successful response for a cancel COPY FROM is unexpected.
579            tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
580            return;
581        };
582
583        let ActiveCopyFrom {
584            ingestion_id,
585            cluster_id: _,
586            table_id: _,
587            mut ctx,
588        } = active_copy;
589        tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
590
591        let mut all_batches = SmallVec::with_capacity(batches.len());
592        let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
593        let mut row_count = 0u64;
594
595        for maybe_batch in batches {
596            match maybe_batch {
597                Ok(batch) => {
598                    let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
599                    all_batches.push(batch);
600                    row_count = row_count.saturating_add(count);
601                }
602                Err(err) => all_errors.push(err),
603            }
604        }
605
606        // If we got any errors we need to fail the whole operation.
607        if let Some(error) = all_errors.pop() {
608            tracing::warn!(?error, ?all_errors, "failed COPY FROM");
609
610            // TODO(cf1): Cleanup the existing ProtoBatches to prevent leaking them.
611            // TODO(cf2): Carry structured errors all the way through.
612
613            ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
614                "COPY FROM: {error}"
615            ))));
616
617            return;
618        }
619
620        // Stage a WriteOp, then when the Session is retired we complete the
621        // transaction, which handles acquiring the write lock for `table_id`,
622        // advancing the timestamps of the staged batches, and waiting for
623        // everything to complete before sending a response to the client.
624        let stage_write = ctx
625            .session_mut()
626            .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
627                id: table_id,
628                rows: TableData::Batches(all_batches),
629            }]));
630
631        if let Err(err) = stage_write {
632            ctx.retire(Err(err));
633        } else {
634            ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
635        }
636    }
637
638    /// Cancel any active `COPY FROM` statements/oneshot ingestions.
639    #[mz_ore::instrument(level = "debug")]
640    pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
641        if let Some(ActiveCopyFrom {
642            ingestion_id,
643            cluster_id: _,
644            table_id: _,
645            ctx,
646        }) = self.active_copies.remove(conn_id)
647        {
648            let cancel_result = self
649                .controller
650                .storage
651                .cancel_oneshot_ingestion(ingestion_id);
652            if let Err(err) = cancel_result {
653                tracing::error!(?err, "failed to cancel OneshotIngestion");
654            }
655
656            ctx.retire(Err(AdapterError::Canceled));
657        }
658    }
659}
660
661/// Describes how to transform a partial row (with only specified columns)
662/// into a full row matching the table schema.
663struct ColumnTransform {
664    /// For each column in the target table, where to get the value.
665    sources: Vec<ColumnSource>,
666    /// Pre-computed default values for columns not in the COPY column list.
667    /// Packed as a Row; indexed by the `Default(idx)` variant.
668    defaults_row: Row,
669}
670
671enum ColumnSource {
672    /// Take the value from the input row at this position.
673    Input(usize),
674    /// Use the pre-computed default at this index in `defaults_row`.
675    Default(usize),
676}
677
678impl ColumnTransform {
679    /// Apply the transform to produce a full row from a partial input row.
680    fn apply(&self, input: &Row) -> Row {
681        let input_datums: Vec<Datum> = input.unpack();
682        let default_datums: Vec<Datum> = self.defaults_row.unpack();
683        let mut output_datums = Vec::with_capacity(self.sources.len());
684        for source in &self.sources {
685            match source {
686                ColumnSource::Input(idx) => output_datums.push(input_datums[*idx]),
687                ColumnSource::Default(idx) => output_datums.push(default_datums[*idx]),
688            }
689        }
690        Row::pack(&output_datums)
691    }
692}