mz_adapter/coord/sequencer/inner/
copy_from.rs1use std::str::FromStr;
11
12use mz_adapter_types::connection::ConnectionId;
13use mz_ore::cast::CastInto;
14use mz_persist_client::batch::ProtoBatch;
15use mz_pgcopy::CopyFormatParams;
16use mz_repr::{CatalogItemId, Datum, RowArena};
17use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
18use mz_sql::session::metadata::SessionMetadata;
19use mz_storage_client::client::TableData;
20use mz_storage_types::oneshot_sources::{ContentShape, OneshotIngestionRequest};
21use smallvec::SmallVec;
22use url::Url;
23use uuid::Uuid;
24
25use crate::coord::sequencer::inner::return_if_err;
26use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
27use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
28use crate::session::{TransactionOps, WriteOp};
29use crate::{AdapterError, ExecuteContext, ExecuteResponse};
30
31impl Coordinator {
32 pub(crate) async fn sequence_copy_from(
33 &mut self,
34 ctx: ExecuteContext,
35 plan: plan::CopyFromPlan,
36 target_cluster: TargetCluster,
37 ) {
38 let plan::CopyFromPlan {
39 target_name: _,
40 target_id,
41 source,
42 columns: _,
43 source_desc,
44 mfp,
45 params,
46 filter,
47 } = plan;
48
49 let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
50 let style = ExprPrepStyle::OneShot {
51 logical_time: EvalTime::NotAvailable,
52 session: ctx.session(),
53 catalog_state: self.catalog().state(),
54 };
55 let mut from = from.lower_uncorrelated()?;
56 prep_scalar_expr(&mut from, style)?;
57
58 let temp_storage = RowArena::new();
61 let eval_result = from.eval(&[], &temp_storage)?;
62 let eval_string = match eval_result {
63 Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
64 Datum::String(url_str) => url_str,
65 other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
66 };
67
68 Ok(eval_string.to_string())
69 };
70
71 let Some(dest_table) = self.catalog().get_entry(&target_id).table() else {
73 let typ = self.catalog().get_entry(&target_id).item().typ();
74 let msg = format!("programming error: expected a Table found {typ:?}");
75 return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
76 };
77
78 let ingestion_id = Uuid::new_v4();
80 let collection_id = dest_table.global_id_writes();
81
82 let format = match params {
83 CopyFormatParams::Csv(csv) => {
84 mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
85 }
86 CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
87 CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
88 mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
89 ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL format")));
90 return;
91 }
92 };
93
94 let source = match source {
95 CopyFromSource::Url(from_expr) => {
96 let url = return_if_err!(eval_uri(from_expr), ctx);
97 let result = Url::parse(&url)
99 .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
100 let url = return_if_err!(result, ctx);
101
102 mz_storage_types::oneshot_sources::ContentSource::Http { url }
103 }
104 CopyFromSource::AwsS3 {
105 uri,
106 connection,
107 connection_id,
108 } => {
109 let uri = return_if_err!(eval_uri(uri), ctx);
110
111 let result = http::Uri::from_str(&uri)
116 .map_err(|err| {
117 AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
118 })
119 .and_then(|uri| {
120 if uri.scheme_str() != Some("s3") {
121 coord_bail!("only 's3://...' urls are supported as COPY FROM target");
122 }
123 Ok(uri)
124 })
125 .and_then(|uri| {
126 if uri.host().is_none() {
127 coord_bail!("missing bucket name from 's3://...' url");
128 }
129 Ok(uri)
130 });
131 let uri = return_if_err!(result, ctx);
132
133 mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
134 connection,
135 connection_id,
136 uri: uri.to_string(),
137 }
138 }
139 CopyFromSource::Stdin => {
140 unreachable!("COPY FROM STDIN should be handled elsewhere")
141 }
142 };
143
144 let filter = match filter {
145 None => mz_storage_types::oneshot_sources::ContentFilter::None,
146 Some(CopyFromFilter::Files(files)) => {
147 mz_storage_types::oneshot_sources::ContentFilter::Files(files)
148 }
149 Some(CopyFromFilter::Pattern(pattern)) => {
150 mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
151 }
152 };
153
154 let source_mfp = mfp
155 .into_plan()
156 .map_err(|s| AdapterError::internal("copy_from", s))
157 .and_then(|mfp| {
158 mfp.into_nontemporal().map_err(|_| {
159 AdapterError::internal("copy_from", "temporal MFP not allowed in copy from")
160 })
161 });
162 let source_mfp = return_if_err!(source_mfp, ctx);
163 let shape = ContentShape {
164 source_desc,
165 source_mfp,
166 };
167
168 let request = OneshotIngestionRequest {
169 source,
170 format,
171 filter,
172 shape,
173 };
174
175 let target_cluster = match self
176 .catalog()
177 .resolve_target_cluster(target_cluster, ctx.session())
178 {
179 Ok(cluster) => cluster,
180 Err(err) => {
181 return ctx.retire(Err(err));
182 }
183 };
184 let cluster_id = target_cluster.id;
185
186 let command_tx = self.internal_cmd_tx.clone();
189 let conn_id = ctx.session().conn_id().clone();
190 let closure = Box::new(move |batches| {
191 let _ = command_tx.send(crate::coord::Message::StagedBatches {
192 conn_id,
193 table_id: target_id,
194 batches,
195 });
196 });
197 let conn_id = ctx.session().conn_id().clone();
199 self.active_copies.insert(
200 conn_id,
201 ActiveCopyFrom {
202 ingestion_id,
203 cluster_id,
204 table_id: target_id,
205 ctx,
206 },
207 );
208
209 let _result = self
210 .controller
211 .storage
212 .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
213 .await;
214 }
215
216 pub(crate) fn commit_staged_batches(
217 &mut self,
218 conn_id: ConnectionId,
219 table_id: CatalogItemId,
220 batches: Vec<Result<ProtoBatch, String>>,
221 ) {
222 let Some(active_copy) = self.active_copies.remove(&conn_id) else {
223 tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
225 return;
226 };
227
228 let ActiveCopyFrom {
229 ingestion_id,
230 cluster_id: _,
231 table_id: _,
232 mut ctx,
233 } = active_copy;
234 tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
235
236 let mut all_batches = SmallVec::with_capacity(batches.len());
237 let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
238 let mut row_count = 0u64;
239
240 for maybe_batch in batches {
241 match maybe_batch {
242 Ok(batch) => {
243 let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
244 all_batches.push(batch);
245 row_count = row_count.saturating_add(count);
246 }
247 Err(err) => all_errors.push(err),
248 }
249 }
250
251 if let Some(error) = all_errors.pop() {
253 tracing::warn!(?error, ?all_errors, "failed COPY FROM");
254
255 ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
259 "COPY FROM: {error}"
260 ))));
261
262 return;
263 }
264
265 let stage_write = ctx
270 .session_mut()
271 .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
272 id: table_id,
273 rows: TableData::Batches(all_batches),
274 }]));
275
276 if let Err(err) = stage_write {
277 ctx.retire(Err(err));
278 } else {
279 ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
280 }
281 }
282
283 #[mz_ore::instrument(level = "debug")]
285 pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
286 if let Some(ActiveCopyFrom {
287 ingestion_id,
288 cluster_id: _,
289 table_id: _,
290 ctx,
291 }) = self.active_copies.remove(conn_id)
292 {
293 let cancel_result = self
294 .controller
295 .storage
296 .cancel_oneshot_ingestion(ingestion_id);
297 if let Err(err) = cancel_result {
298 tracing::error!(?err, "failed to cancel OneshotIngestion");
299 }
300
301 ctx.retire(Err(AdapterError::Canceled));
302 }
303 }
304}