mz_adapter/coord/sequencer/inner/
copy_from.rs1use std::str::FromStr;
11
12use mz_adapter_types::connection::ConnectionId;
13use mz_ore::cast::CastInto;
14use mz_persist_client::batch::ProtoBatch;
15use mz_pgcopy::CopyFormatParams;
16use mz_repr::{CatalogItemId, Datum, RowArena};
17use mz_sql::plan::{self, CopyFromFilter, CopyFromSource, HirScalarExpr};
18use mz_sql::session::metadata::SessionMetadata;
19use mz_storage_client::client::TableData;
20use mz_storage_types::oneshot_sources::{ContentShape, OneshotIngestionRequest};
21use smallvec::SmallVec;
22use url::Url;
23use uuid::Uuid;
24
25use crate::coord::sequencer::inner::return_if_err;
26use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
27use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
28use crate::session::{TransactionOps, WriteOp};
29use crate::{AdapterError, ExecuteContext, ExecuteResponse};
30
31impl Coordinator {
32 pub(crate) async fn sequence_copy_from(
33 &mut self,
34 ctx: ExecuteContext,
35 plan: plan::CopyFromPlan,
36 target_cluster: TargetCluster,
37 ) {
38 let plan::CopyFromPlan {
39 id,
40 source,
41 columns: _,
42 source_desc,
43 mfp,
44 params,
45 filter,
46 } = plan;
47
48 let eval_uri = |from: HirScalarExpr| -> Result<String, AdapterError> {
49 let style = ExprPrepStyle::OneShot {
50 logical_time: EvalTime::NotAvailable,
51 session: ctx.session(),
52 catalog_state: self.catalog().state(),
53 };
54 let mut from = from.lower_uncorrelated()?;
55 prep_scalar_expr(&mut from, style)?;
56
57 let temp_storage = RowArena::new();
60 let eval_result = from.eval(&[], &temp_storage)?;
61 let eval_string = match eval_result {
62 Datum::Null => coord_bail!("COPY FROM target value cannot be NULL"),
63 Datum::String(url_str) => url_str,
64 other => coord_bail!("programming error! COPY FROM target cannot be {other}"),
65 };
66
67 Ok(eval_string.to_string())
68 };
69
70 let Some(dest_table) = self.catalog().get_entry(&id).table() else {
72 let typ = self.catalog().get_entry(&id).item().typ();
73 let msg = format!("programming error: expected a Table found {typ:?}");
74 return ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(msg))));
75 };
76
77 let ingestion_id = Uuid::new_v4();
79 let collection_id = dest_table.global_id_writes();
80
81 let format = match params {
82 CopyFormatParams::Csv(csv) => {
83 mz_storage_types::oneshot_sources::ContentFormat::Csv(csv.to_owned())
84 }
85 CopyFormatParams::Parquet => mz_storage_types::oneshot_sources::ContentFormat::Parquet,
86 CopyFormatParams::Text(_) | CopyFormatParams::Binary => {
87 mz_ore::soft_panic_or_log!("unsupported formats should be rejected in planning");
88 ctx.retire(Err(AdapterError::Unsupported("COPY FROM URL format")));
89 return;
90 }
91 };
92
93 let source = match source {
94 CopyFromSource::Url(from_expr) => {
95 let url = return_if_err!(eval_uri(from_expr), ctx);
96 let result = Url::parse(&url)
98 .map_err(|err| AdapterError::Unstructured(anyhow::anyhow!("{err}")));
99 let url = return_if_err!(result, ctx);
100
101 mz_storage_types::oneshot_sources::ContentSource::Http { url }
102 }
103 CopyFromSource::AwsS3 {
104 uri,
105 connection,
106 connection_id,
107 } => {
108 let uri = return_if_err!(eval_uri(uri), ctx);
109
110 let result = http::Uri::from_str(&uri)
115 .map_err(|err| {
116 AdapterError::Unstructured(anyhow::anyhow!("expected S3 uri: {err}"))
117 })
118 .and_then(|uri| {
119 if uri.scheme_str() != Some("s3") {
120 coord_bail!("only 's3://...' urls are supported as COPY FROM target");
121 }
122 Ok(uri)
123 })
124 .and_then(|uri| {
125 if uri.host().is_none() {
126 coord_bail!("missing bucket name from 's3://...' url");
127 }
128 Ok(uri)
129 });
130 let uri = return_if_err!(result, ctx);
131
132 mz_storage_types::oneshot_sources::ContentSource::AwsS3 {
133 connection,
134 connection_id,
135 uri: uri.to_string(),
136 }
137 }
138 CopyFromSource::Stdin => {
139 unreachable!("COPY FROM STDIN should be handled elsewhere")
140 }
141 };
142
143 let filter = match filter {
144 None => mz_storage_types::oneshot_sources::ContentFilter::None,
145 Some(CopyFromFilter::Files(files)) => {
146 mz_storage_types::oneshot_sources::ContentFilter::Files(files)
147 }
148 Some(CopyFromFilter::Pattern(pattern)) => {
149 mz_storage_types::oneshot_sources::ContentFilter::Pattern(pattern)
150 }
151 };
152
153 let source_mfp = mfp
154 .into_plan()
155 .map_err(|s| AdapterError::internal("copy_from", s))
156 .and_then(|mfp| {
157 mfp.into_nontemporal().map_err(|_| {
158 AdapterError::internal("copy_from", "temporal MFP not allowed in copy from")
159 })
160 });
161 let source_mfp = return_if_err!(source_mfp, ctx);
162 let shape = ContentShape {
163 source_desc,
164 source_mfp,
165 };
166
167 let request = OneshotIngestionRequest {
168 source,
169 format,
170 filter,
171 shape,
172 };
173
174 let target_cluster = match self
175 .catalog()
176 .resolve_target_cluster(target_cluster, ctx.session())
177 {
178 Ok(cluster) => cluster,
179 Err(err) => {
180 return ctx.retire(Err(err));
181 }
182 };
183 let cluster_id = target_cluster.id;
184
185 let command_tx = self.internal_cmd_tx.clone();
188 let conn_id = ctx.session().conn_id().clone();
189 let closure = Box::new(move |batches| {
190 let _ = command_tx.send(crate::coord::Message::StagedBatches {
191 conn_id,
192 table_id: id,
193 batches,
194 });
195 });
196 let conn_id = ctx.session().conn_id().clone();
198 self.active_copies.insert(
199 conn_id,
200 ActiveCopyFrom {
201 ingestion_id,
202 cluster_id,
203 table_id: id,
204 ctx,
205 },
206 );
207
208 let _result = self
209 .controller
210 .storage
211 .create_oneshot_ingestion(ingestion_id, collection_id, cluster_id, request, closure)
212 .await;
213 }
214
215 pub(crate) fn commit_staged_batches(
216 &mut self,
217 conn_id: ConnectionId,
218 table_id: CatalogItemId,
219 batches: Vec<Result<ProtoBatch, String>>,
220 ) {
221 let Some(active_copy) = self.active_copies.remove(&conn_id) else {
222 tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
224 return;
225 };
226
227 let ActiveCopyFrom {
228 ingestion_id,
229 cluster_id: _,
230 table_id: _,
231 mut ctx,
232 } = active_copy;
233 tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
234
235 let mut all_batches = SmallVec::with_capacity(batches.len());
236 let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
237 let mut row_count = 0u64;
238
239 for maybe_batch in batches {
240 match maybe_batch {
241 Ok(batch) => {
242 let count = batch.batch.as_ref().map(|b| b.len).unwrap_or(0);
243 all_batches.push(batch);
244 row_count = row_count.saturating_add(count);
245 }
246 Err(err) => all_errors.push(err),
247 }
248 }
249
250 if let Some(error) = all_errors.pop() {
252 tracing::warn!(?error, ?all_errors, "failed COPY FROM");
253
254 ctx.retire(Err(AdapterError::Unstructured(anyhow::anyhow!(
258 "COPY FROM: {error}"
259 ))));
260
261 return;
262 }
263
264 let stage_write = ctx
269 .session_mut()
270 .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
271 id: table_id,
272 rows: TableData::Batches(all_batches),
273 }]));
274
275 if let Err(err) = stage_write {
276 ctx.retire(Err(err));
277 } else {
278 ctx.retire(Ok(ExecuteResponse::Copied(row_count.cast_into())));
279 }
280 }
281
282 #[mz_ore::instrument(level = "debug")]
284 pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
285 if let Some(ActiveCopyFrom {
286 ingestion_id,
287 cluster_id: _,
288 table_id: _,
289 ctx,
290 }) = self.active_copies.remove(conn_id)
291 {
292 let cancel_result = self
293 .controller
294 .storage
295 .cancel_oneshot_ingestion(ingestion_id);
296 if let Err(err) = cancel_result {
297 tracing::error!(?err, "failed to cancel OneshotIngestion");
298 }
299
300 ctx.retire(Err(AdapterError::Canceled));
301 }
302 }
303}