1use std::collections::BTreeMap;
13use std::sync::Arc;
14
15use mz_adapter::{AppendWebhookError, AppendWebhookResponse, WebhookAppenderCache};
16use mz_ore::cast::CastFrom;
17use mz_ore::retry::{Retry, RetryResult};
18use mz_ore::str::StrExt;
19use mz_repr::adt::jsonb::Jsonb;
20use mz_repr::{Datum, Diff, Row, RowPacker, ScalarType, Timestamp};
21use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
22use mz_storage_types::controller::StorageError;
23
24use axum::extract::{Path, State};
25use axum::response::IntoResponse;
26use bytes::Bytes;
27use http::StatusCode;
28use thiserror::Error;
29
30use crate::http::WebhookState;
31
32pub async fn handle_webhook(
33 State(WebhookState {
34 adapter_client,
35 webhook_cache,
36 }): State<WebhookState>,
37 Path((database, schema, name)): Path<(String, String, String)>,
38 headers: http::HeaderMap,
39 body: Bytes,
40) -> impl IntoResponse {
41 let mut headers_s = BTreeMap::new();
43 for (name, val) in headers.iter() {
44 if let Ok(val_s) = val.to_str().map(|s| s.to_string()) {
45 let existing = headers_s.insert(name.as_str().to_string(), val_s);
47 if existing.is_some() {
48 let msg = format!("{} provided more than once", name.as_str());
49 return Err(WebhookError::InvalidHeaders(msg));
50 }
51 }
52 }
53 let headers = Arc::new(headers_s);
54
55 Retry::default()
57 .max_tries(2)
58 .retry_async(|_| async {
59 let result = append_webhook(
60 &adapter_client,
61 &webhook_cache,
62 &database,
63 &schema,
64 &name,
65 &body,
66 &headers,
67 )
68 .await;
69
70 match result {
73 Ok(()) => RetryResult::Ok(()),
74 Err(e @ AppendWebhookError::ChannelClosed) => RetryResult::RetryableErr(e),
75 Err(e) => RetryResult::FatalErr(e),
76 }
77 })
78 .await?;
79
80 Ok::<_, WebhookError>(())
81}
82
83async fn append_webhook(
86 adapter_client: &mz_adapter::Client,
87 webhook_cache: &WebhookAppenderCache,
88 database: &str,
89 schema: &str,
90 name: &str,
91 body: &Bytes,
92 headers: &Arc<BTreeMap<String, String>>,
93) -> Result<(), AppendWebhookError> {
94 let (database, schema, name) = (database.to_string(), schema.to_string(), name.to_string());
96
97 let received_at = adapter_client.now();
99
100 let AppendWebhookResponse {
102 tx,
103 body_format,
104 header_tys,
105 validator,
106 } = async {
107 let mut guard = webhook_cache.entries.lock().await;
108
109 match guard.remove(&(database.clone(), schema.clone(), name.clone())) {
111 Some(appender) if !appender.tx.is_closed() => {
112 guard.insert((database, schema, name), appender.clone());
113 Ok::<_, AppendWebhookError>(appender)
114 }
115 _ => {
119 tracing::info!(?database, ?schema, ?name, "fetching webhook appender");
120 adapter_client.metrics().webhook_get_appender.inc();
121
122 let appender = adapter_client
124 .get_webhook_appender(database.clone(), schema.clone(), name.clone())
125 .await?;
126
127 guard.insert((database, schema, name), appender.clone());
128
129 Ok(appender)
130 }
131 }
132 }
133 .await?;
134
135 tx.increment_messages_received(1);
138 tx.increment_bytes_received(u64::cast_from(body.len()));
139
140 if let Some(validator) = validator {
142 let valid = validator
143 .eval(Bytes::clone(body), Arc::clone(headers), received_at)
144 .await?;
145 if !valid {
146 return Err(AppendWebhookError::ValidationFailed);
147 }
148 }
149
150 let rows = pack_rows(body, &body_format, headers, &header_tys)?;
152
153 tx.append(rows).await?;
155
156 Ok(())
157}
158
159fn pack_rows(
164 body: &[u8],
165 body_format: &WebhookBodyFormat,
166 headers: &BTreeMap<String, String>,
167 header_tys: &WebhookHeaders,
168) -> Result<Vec<(Row, Diff)>, AppendWebhookError> {
169 let rows = transform_body(body, body_format)?
172 .into_iter()
173 .map(|row| pack_header(row, headers, header_tys).map(|row| (row, Diff::ONE)))
174 .collect::<Result<_, _>>()?;
175 Ok(rows)
176}
177
178fn transform_body(
180 body: &[u8],
181 format: &WebhookBodyFormat,
182) -> Result<Vec<BodyRow>, AppendWebhookError> {
183 let rows = match format {
184 WebhookBodyFormat::Bytes => {
185 vec![Row::pack_slice(&[Datum::Bytes(body)])]
186 }
187 WebhookBodyFormat::Text => {
188 let s = std::str::from_utf8(body)
189 .map_err(|m| AppendWebhookError::InvalidUtf8Body { msg: m.to_string() })?;
190 vec![Row::pack_slice(&[Datum::String(s)])]
191 }
192 WebhookBodyFormat::Json { array } => {
193 let objects = serde_json::Deserializer::from_slice(body)
194 .into_iter::<serde_json::Value>()
197 .flat_map(|value| match value {
199 Ok(serde_json::Value::Array(inners)) if *array => {
200 itertools::Either::Left(inners.into_iter().map(Result::Ok))
201 }
202 value => itertools::Either::Right(std::iter::once(value)),
203 })
204 .collect::<Result<Vec<_>, _>>()
205 .map_err(|m| AppendWebhookError::InvalidJsonBody { msg: m.to_string() })?;
206
207 let rows = objects
210 .into_iter()
211 .map(|o| {
213 let row = Jsonb::from_serde_json(o)
214 .map_err(|m| AppendWebhookError::InvalidJsonBody { msg: m.to_string() })?
215 .into_row();
216 Ok::<_, AppendWebhookError>(row)
217 })
218 .collect::<Result<_, _>>()?;
219
220 rows
221 }
222 };
223
224 let body_rows = rows.into_iter().map(BodyRow).collect();
227
228 Ok(body_rows)
229}
230
231fn pack_header(
233 mut body_row: BodyRow,
234 headers: &BTreeMap<String, String>,
235 header_tys: &WebhookHeaders,
236) -> Result<Row, AppendWebhookError> {
237 let num_cols = 1 + header_tys.num_columns();
239 let mut num_cols_written = 1;
241
242 let mut packer = RowPacker::for_existing_row(body_row.inner_mut());
243
244 if let Some(filters) = &header_tys.header_column {
246 packer.push_dict(
247 filter_headers(headers, filters).map(|(name, val)| (name, Datum::String(val))),
248 );
249 num_cols_written += 1;
250 }
251
252 for idx in num_cols_written..num_cols {
254 let (header_name, use_bytes) = header_tys
255 .mapped_headers
256 .get(&idx)
257 .ok_or_else(|| anyhow::anyhow!("Invalid header column index {idx}"))?;
258 let header = headers.get(header_name);
259 let datum = match header {
260 Some(h) if *use_bytes => Datum::Bytes(h.as_bytes()),
261 Some(h) => Datum::String(h),
262 None => Datum::Null,
263 };
264 packer.push(datum);
265 }
266
267 Ok(body_row.into_inner())
268}
269
270fn filter_headers<'a: 'b, 'b>(
271 headers: &'a BTreeMap<String, String>,
272 filters: &'b WebhookHeaderFilters,
273) -> impl Iterator<Item = (&'a str, &'a str)> + 'b {
274 headers
275 .iter()
276 .filter(|(header_name, _val)| {
277 filters.block.is_empty() || !filters.block.contains(*header_name)
279 })
280 .filter(|(header_name, _val)| {
281 filters.allow.is_empty() || filters.allow.contains(*header_name)
283 })
284 .map(|(key, val)| (key.as_str(), val.as_str()))
285}
286
287#[repr(transparent)]
292struct BodyRow(Row);
293
294impl BodyRow {
295 fn inner_mut(&mut self) -> &mut Row {
297 &mut self.0
298 }
299
300 fn into_inner(self) -> Row {
302 self.0
303 }
304}
305
306#[derive(Error, Debug)]
314pub enum WebhookError {
315 #[error("no object was found at the path {}", .0.quoted())]
316 NotFound(String),
317 #[error("the required auth could not be found")]
318 SecretMissing,
319 #[error("headers of request were invalid: {0}")]
320 InvalidHeaders(String),
321 #[error("failed to deserialize body as {ty:?}: {msg}")]
322 InvalidBody { ty: ScalarType, msg: String },
323 #[error("failed to validate the request")]
324 ValidationFailed,
325 #[error("error occurred while running validation")]
326 ValidationError,
327 #[error("service unavailable")]
328 Unavailable,
329 #[error("internal storage failure! {0:?}")]
330 InternalStorageError(StorageError<Timestamp>),
331 #[error("internal failure! {0:?}")]
332 Internal(#[from] anyhow::Error),
333}
334
335impl From<AppendWebhookError> for WebhookError {
336 fn from(err: AppendWebhookError) -> Self {
337 match err {
338 AppendWebhookError::MissingSecret => WebhookError::SecretMissing,
339 AppendWebhookError::ValidationError => WebhookError::ValidationError,
340 AppendWebhookError::InvalidUtf8Body { msg } => WebhookError::InvalidBody {
341 ty: ScalarType::String,
342 msg,
343 },
344 AppendWebhookError::InvalidJsonBody { msg } => WebhookError::InvalidBody {
345 ty: ScalarType::Jsonb,
346 msg,
347 },
348 AppendWebhookError::UnknownWebhook {
349 database,
350 schema,
351 name,
352 } => WebhookError::NotFound(format!("'{database}.{schema}.{name}'")),
353 AppendWebhookError::ValidationFailed => WebhookError::ValidationFailed,
354 AppendWebhookError::ChannelClosed => {
355 WebhookError::Internal(anyhow::anyhow!("channel closed"))
356 }
357 AppendWebhookError::StorageError(storage_err) => {
358 match storage_err {
359 StorageError::IdentifierMissing(id) | StorageError::IdentifierInvalid(id) => {
361 WebhookError::NotFound(id.to_string())
362 }
363 StorageError::ShuttingDown(_) => WebhookError::Unavailable,
364 e => WebhookError::InternalStorageError(e),
365 }
366 }
367 AppendWebhookError::InternalError(err) => WebhookError::Internal(err),
368 }
369 }
370}
371
372impl IntoResponse for WebhookError {
373 fn into_response(self) -> axum::response::Response {
374 match self {
375 e @ WebhookError::NotFound(_) | e @ WebhookError::SecretMissing => {
376 (StatusCode::NOT_FOUND, e.to_string()).into_response()
377 }
378 e @ WebhookError::InvalidBody { .. }
379 | e @ WebhookError::ValidationFailed
380 | e @ WebhookError::ValidationError => {
381 (StatusCode::BAD_REQUEST, e.to_string()).into_response()
382 }
383 e @ WebhookError::InvalidHeaders(_) => {
384 (StatusCode::UNAUTHORIZED, e.to_string()).into_response()
385 }
386 e @ WebhookError::Unavailable => {
387 (StatusCode::SERVICE_UNAVAILABLE, e.to_string()).into_response()
388 }
389 e @ WebhookError::InternalStorageError(StorageError::ResourceExhausted(_)) => {
390 (StatusCode::TOO_MANY_REQUESTS, e.to_string()).into_response()
391 }
392 e @ WebhookError::InternalStorageError(_) => {
393 (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
394 }
395 WebhookError::Internal(e) => (
396 StatusCode::INTERNAL_SERVER_ERROR,
397 e.root_cause().to_string(),
398 )
399 .into_response(),
400 }
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use std::collections::{BTreeMap, BTreeSet};
407
408 use axum::response::IntoResponse;
409 use bytes::Bytes;
410 use http::StatusCode;
411 use mz_adapter::AppendWebhookError;
412 use mz_ore::assert_none;
413 use mz_repr::{GlobalId, Row};
414 use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
415 use mz_storage_types::controller::StorageError;
416 use proptest::prelude::*;
417 use proptest::strategy::Union;
418
419 use super::{WebhookError, filter_headers, pack_rows};
420
421 fn arbitrary_json() -> impl Strategy<Value = serde_json::Value> {
423 let json_leaf = Union::new(vec![
424 any::<()>().prop_map(|_| serde_json::Value::Null).boxed(),
425 any::<bool>().prop_map(serde_json::Value::Bool).boxed(),
426 any::<i64>()
427 .prop_map(|x| serde_json::Value::Number(x.into()))
428 .boxed(),
429 any::<String>().prop_map(serde_json::Value::String).boxed(),
430 ]);
431
432 json_leaf.prop_recursive(4, 32, 8, |element| {
433 Union::new(vec![
434 prop::collection::vec(element.clone(), 0..16)
435 .prop_map(serde_json::Value::Array)
436 .boxed(),
437 prop::collection::hash_map(".*", element, 0..16)
438 .prop_map(|map| serde_json::Value::Object(map.into_iter().collect()))
439 .boxed(),
440 ])
441 })
442 }
443
444 #[track_caller]
445 fn check_rows(rows: &Vec<(Row, mz_repr::Diff)>, expected_rows: usize, expected_cols: usize) {
446 assert_eq!(rows.len(), expected_rows);
447 for (row, _diff) in rows {
448 assert_eq!(row.unpack().len(), expected_cols);
449 }
450 }
451
452 #[mz_ore::test]
453 fn smoke_test_storage_error_response_status() {
454 let resp = WebhookError::from(AppendWebhookError::StorageError(
456 StorageError::ResourceExhausted("test"),
457 ))
458 .into_response();
459 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
460
461 let resp = WebhookError::from(AppendWebhookError::StorageError(
463 StorageError::IdentifierMissing(GlobalId::User(42)),
464 ))
465 .into_response();
466 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
467 }
468
469 #[mz_ore::test]
470 fn smoke_test_filter_headers() {
471 let block = BTreeSet::from(["foo".to_string()]);
472 let allow = BTreeSet::from(["bar".to_string()]);
473
474 let headers = BTreeMap::from([
475 ("foo".to_string(), "1".to_string()),
476 ("bar".to_string(), "2".to_string()),
477 ("baz".to_string(), "3".to_string()),
478 ]);
479 let mut filters = WebhookHeaderFilters::default();
480 filters.block.clone_from(&block);
481
482 let mut h = filter_headers(&headers, &filters);
483 assert_eq!(h.next().unwrap().0, "bar");
484 assert_eq!(h.next().unwrap().0, "baz");
485 assert_none!(h.next());
486
487 let mut filters = WebhookHeaderFilters::default();
488 filters.allow.clone_from(&allow);
489
490 let mut h = filter_headers(&headers, &filters);
491 assert_eq!(h.next().unwrap().0, "bar");
492 assert_none!(h.next());
493
494 let mut filters = WebhookHeaderFilters::default();
495 filters.allow = allow;
496 filters.block = block;
497
498 let mut h = filter_headers(&headers, &filters);
499 assert_eq!(h.next().unwrap().0, "bar");
500 assert_none!(h.next());
501 }
502
503 #[mz_ore::test]
504 fn filter_headers_block_overrides_allow() {
505 let block = BTreeSet::from(["foo".to_string()]);
506 let allow = block.clone();
507
508 let headers = BTreeMap::from([
509 ("foo".to_string(), "1".to_string()),
510 ("bar".to_string(), "2".to_string()),
511 ("baz".to_string(), "3".to_string()),
512 ]);
513 let filters = WebhookHeaderFilters { block, allow };
514
515 let mut h = filter_headers(&headers, &filters);
517 assert_none!(h.next());
518 }
519
520 #[mz_ore::test]
521 fn test_json_array_single() {
522 let single_raw = r#"
523 {
524 "event_type": "i am a single object",
525 "another_field": 42
526 }
527 "#;
528
529 let rows = pack_rows(
531 single_raw.as_bytes(),
532 &WebhookBodyFormat::Json { array: false },
533 &BTreeMap::default(),
534 &WebhookHeaders::default(),
535 )
536 .unwrap();
537 assert_eq!(rows.len(), 1);
538
539 let rows = pack_rows(
541 single_raw.as_bytes(),
542 &WebhookBodyFormat::Json { array: true },
543 &BTreeMap::default(),
544 &WebhookHeaders::default(),
545 )
546 .unwrap();
547 assert_eq!(rows.len(), 1);
548 }
549
550 #[mz_ore::test]
551 fn test_json_deserializer_multi() {
552 let multi_raw = r#"
553 [
554 { "event_type": "smol" },
555 { "event_type": "dog" }
556 ]
557 "#;
558
559 let rows = pack_rows(
560 multi_raw.as_bytes(),
561 &WebhookBodyFormat::Json { array: false },
562 &BTreeMap::default(),
563 &WebhookHeaders::default(),
564 )
565 .unwrap();
566 assert_eq!(rows.len(), 1);
568
569 let rows = pack_rows(
570 multi_raw.as_bytes(),
571 &WebhookBodyFormat::Json { array: true },
572 &BTreeMap::default(),
573 &WebhookHeaders::default(),
574 )
575 .unwrap();
576 assert_eq!(rows.len(), 2);
578 }
579
580 proptest! {
581 #[mz_ore::test]
582 fn proptest_pack_row_never_panics(
583 body: Vec<u8>,
584 body_ty: WebhookBodyFormat,
585 headers: BTreeMap<String, String>,
586 non_existent_headers: Vec<String>,
587 block: BTreeSet<String>,
588 allow: BTreeSet<String>,
589 ) {
590 let body = Bytes::from(body);
591
592 let filters = WebhookHeaderFilters { block, allow };
594 let mut use_bytes = false;
596 let mapped_headers = headers
597 .keys()
598 .take(headers.len() / 2)
599 .chain(non_existent_headers.iter())
600 .cloned()
601 .enumerate()
602 .map(|(idx, name)| {
603 use_bytes = !use_bytes;
604 (idx + 2, (name, use_bytes))
605 })
606 .collect();
607 let header_tys = WebhookHeaders {
608 header_column: Some(filters),
609 mapped_headers,
610 };
611
612 let _ = pack_rows(&body[..], &body_ty, &headers, &header_tys);
614 }
615
616 #[mz_ore::test]
617 fn proptest_pack_row_succeeds_for_bytes(
618 body: Vec<u8>,
619 headers: BTreeMap<String, String>,
620 include_headers: bool,
621 ) {
622 let body = Bytes::from(body);
623
624 let body_ty = WebhookBodyFormat::Bytes;
625 let mut header_tys = WebhookHeaders::default();
626 header_tys.header_column = include_headers.then(Default::default);
627
628 let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
629 check_rows(&rows, 1, header_tys.num_columns() + 1);
630 }
631
632 #[mz_ore::test]
633 fn proptest_pack_row_succeeds_for_strings(
634 body: String,
635 headers: BTreeMap<String, String>,
636 include_headers: bool,
637 ) {
638 let body = Bytes::from(body);
639
640 let body_ty = WebhookBodyFormat::Text;
641 let mut header_tys = WebhookHeaders::default();
642 header_tys.header_column = include_headers.then(Default::default);
643
644 let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
645 check_rows(&rows, 1, header_tys.num_columns() + 1);
646 }
647
648 #[mz_ore::test]
649 fn proptest_pack_row_succeeds_for_selective_headers(
650 body: String,
651 headers: BTreeMap<String, String>,
652 include_headers: bool,
653 non_existent_headers: Vec<String>,
654 block: BTreeSet<String>,
655 allow: BTreeSet<String>,
656 ) {
657 let body = Bytes::from(body);
658 let body_ty = WebhookBodyFormat::Text;
659
660 let filters = WebhookHeaderFilters { block, allow };
662 let mut use_bytes = false;
664 let column_offset = if include_headers { 2 } else { 1 };
665 let mapped_headers = headers
666 .keys()
667 .take(headers.len() / 2)
668 .chain(non_existent_headers.iter())
669 .cloned()
670 .enumerate()
671 .map(|(idx, name)| {
672 use_bytes = !use_bytes;
673 (idx + column_offset, (name, use_bytes))
674 })
675 .collect();
676 let header_tys = WebhookHeaders {
677 header_column: include_headers.then_some(filters),
678 mapped_headers,
679 };
680
681 let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
682 check_rows(&rows, 1, header_tys.num_columns() + 1);
683 }
684
685 #[mz_ore::test]
686 fn proptest_pack_json_with_array_expansion(
687 body in arbitrary_json(),
688 expand_array: bool,
689 headers: BTreeMap<String, String>,
690 include_headers: bool,
691 ) {
692 let json_raw = serde_json::to_vec(&body).unwrap();
693 let mut header_tys = WebhookHeaders::default();
694 header_tys.header_column = include_headers.then(Default::default);
695
696 let rows = pack_rows(
697 &json_raw[..],
698 &WebhookBodyFormat::Json { array: expand_array },
699 &headers,
700 &header_tys,
701 )
702 .unwrap();
703
704 let expected_num_rows = match body {
705 serde_json::Value::Array(inner) if expand_array => inner.len(),
706 _ => 1,
707 };
708 check_rows(&rows, expected_num_rows, header_tys.num_columns() + 1);
709 }
710 }
711}