1use std::collections::BTreeMap;
13use std::sync::Arc;
14
15use mz_adapter::{AppendWebhookError, AppendWebhookResponse, WebhookAppenderCache};
16use mz_ore::cast::CastFrom;
17use mz_ore::retry::{Retry, RetryResult};
18use mz_ore::str::StrExt;
19use mz_repr::adt::jsonb::Jsonb;
20use mz_repr::{Datum, Diff, Row, RowPacker, ScalarType, Timestamp};
21use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
22use mz_storage_types::controller::StorageError;
23
24use axum::extract::{Path, State};
25use axum::response::IntoResponse;
26use bytes::Bytes;
27use http::StatusCode;
28use thiserror::Error;
29
30use crate::http::WebhookState;
31
32pub async fn handle_webhook(
33 State(WebhookState {
34 adapter_client_rx,
35 webhook_cache,
36 }): State<WebhookState>,
37 Path((database, schema, name)): Path<(String, String, String)>,
38 headers: http::HeaderMap,
39 body: Bytes,
40) -> impl IntoResponse {
41 let adapter_client = adapter_client_rx.clone().await.expect("sender not dropped");
42 let mut headers_s = BTreeMap::new();
44 for (name, val) in headers.iter() {
45 if let Ok(val_s) = val.to_str().map(|s| s.to_string()) {
46 let existing = headers_s.insert(name.as_str().to_string(), val_s);
48 if existing.is_some() {
49 let msg = format!("{} provided more than once", name.as_str());
50 return Err(WebhookError::InvalidHeaders(msg));
51 }
52 }
53 }
54 let headers = Arc::new(headers_s);
55
56 Retry::default()
58 .max_tries(2)
59 .retry_async(|_| async {
60 let result = append_webhook(
61 &adapter_client,
62 &webhook_cache,
63 &database,
64 &schema,
65 &name,
66 &body,
67 &headers,
68 )
69 .await;
70
71 match result {
74 Ok(()) => RetryResult::Ok(()),
75 Err(e @ AppendWebhookError::ChannelClosed) => RetryResult::RetryableErr(e),
76 Err(e) => RetryResult::FatalErr(e),
77 }
78 })
79 .await?;
80
81 Ok::<_, WebhookError>(())
82}
83
84async fn append_webhook(
87 adapter_client: &mz_adapter::Client,
88 webhook_cache: &WebhookAppenderCache,
89 database: &str,
90 schema: &str,
91 name: &str,
92 body: &Bytes,
93 headers: &Arc<BTreeMap<String, String>>,
94) -> Result<(), AppendWebhookError> {
95 let (database, schema, name) = (database.to_string(), schema.to_string(), name.to_string());
97
98 let received_at = adapter_client.now();
100
101 let AppendWebhookResponse {
103 tx,
104 body_format,
105 header_tys,
106 validator,
107 } = async {
108 let mut guard = webhook_cache.entries.lock().await;
109
110 match guard.remove(&(database.clone(), schema.clone(), name.clone())) {
112 Some(appender) if !appender.tx.is_closed() => {
113 guard.insert((database, schema, name), appender.clone());
114 Ok::<_, AppendWebhookError>(appender)
115 }
116 _ => {
120 tracing::info!(?database, ?schema, ?name, "fetching webhook appender");
121 adapter_client.metrics().webhook_get_appender.inc();
122
123 let appender = adapter_client
125 .get_webhook_appender(database.clone(), schema.clone(), name.clone())
126 .await?;
127
128 guard.insert((database, schema, name), appender.clone());
129
130 Ok(appender)
131 }
132 }
133 }
134 .await?;
135
136 tx.increment_messages_received(1);
139 tx.increment_bytes_received(u64::cast_from(body.len()));
140
141 if let Some(validator) = validator {
143 let valid = validator
144 .eval(Bytes::clone(body), Arc::clone(headers), received_at)
145 .await?;
146 if !valid {
147 return Err(AppendWebhookError::ValidationFailed);
148 }
149 }
150
151 let rows = pack_rows(body, &body_format, headers, &header_tys)?;
153
154 tx.append(rows).await?;
156
157 Ok(())
158}
159
160fn pack_rows(
165 body: &[u8],
166 body_format: &WebhookBodyFormat,
167 headers: &BTreeMap<String, String>,
168 header_tys: &WebhookHeaders,
169) -> Result<Vec<(Row, Diff)>, AppendWebhookError> {
170 let rows = transform_body(body, body_format)?
173 .into_iter()
174 .map(|row| pack_header(row, headers, header_tys).map(|row| (row, Diff::ONE)))
175 .collect::<Result<_, _>>()?;
176 Ok(rows)
177}
178
179fn transform_body(
181 body: &[u8],
182 format: &WebhookBodyFormat,
183) -> Result<Vec<BodyRow>, AppendWebhookError> {
184 let rows = match format {
185 WebhookBodyFormat::Bytes => {
186 vec![Row::pack_slice(&[Datum::Bytes(body)])]
187 }
188 WebhookBodyFormat::Text => {
189 let s = std::str::from_utf8(body)
190 .map_err(|m| AppendWebhookError::InvalidUtf8Body { msg: m.to_string() })?;
191 vec![Row::pack_slice(&[Datum::String(s)])]
192 }
193 WebhookBodyFormat::Json { array } => {
194 let objects = serde_json::Deserializer::from_slice(body)
195 .into_iter::<serde_json::Value>()
198 .flat_map(|value| match value {
200 Ok(serde_json::Value::Array(inners)) if *array => {
201 itertools::Either::Left(inners.into_iter().map(Result::Ok))
202 }
203 value => itertools::Either::Right(std::iter::once(value)),
204 })
205 .collect::<Result<Vec<_>, _>>()
206 .map_err(|m| AppendWebhookError::InvalidJsonBody { msg: m.to_string() })?;
207
208 let rows = objects
211 .into_iter()
212 .map(|o| {
214 let row = Jsonb::from_serde_json(o)
215 .map_err(|m| AppendWebhookError::InvalidJsonBody { msg: m.to_string() })?
216 .into_row();
217 Ok::<_, AppendWebhookError>(row)
218 })
219 .collect::<Result<_, _>>()?;
220
221 rows
222 }
223 };
224
225 let body_rows = rows.into_iter().map(BodyRow).collect();
228
229 Ok(body_rows)
230}
231
232fn pack_header(
234 mut body_row: BodyRow,
235 headers: &BTreeMap<String, String>,
236 header_tys: &WebhookHeaders,
237) -> Result<Row, AppendWebhookError> {
238 let num_cols = 1 + header_tys.num_columns();
240 let mut num_cols_written = 1;
242
243 let mut packer = RowPacker::for_existing_row(body_row.inner_mut());
244
245 if let Some(filters) = &header_tys.header_column {
247 packer.push_dict(
248 filter_headers(headers, filters).map(|(name, val)| (name, Datum::String(val))),
249 );
250 num_cols_written += 1;
251 }
252
253 for idx in num_cols_written..num_cols {
255 let (header_name, use_bytes) = header_tys
256 .mapped_headers
257 .get(&idx)
258 .ok_or_else(|| anyhow::anyhow!("Invalid header column index {idx}"))?;
259 let header = headers.get(header_name);
260 let datum = match header {
261 Some(h) if *use_bytes => Datum::Bytes(h.as_bytes()),
262 Some(h) => Datum::String(h),
263 None => Datum::Null,
264 };
265 packer.push(datum);
266 }
267
268 Ok(body_row.into_inner())
269}
270
271fn filter_headers<'a: 'b, 'b>(
272 headers: &'a BTreeMap<String, String>,
273 filters: &'b WebhookHeaderFilters,
274) -> impl Iterator<Item = (&'a str, &'a str)> + 'b {
275 headers
276 .iter()
277 .filter(|(header_name, _val)| {
278 filters.block.is_empty() || !filters.block.contains(*header_name)
280 })
281 .filter(|(header_name, _val)| {
282 filters.allow.is_empty() || filters.allow.contains(*header_name)
284 })
285 .map(|(key, val)| (key.as_str(), val.as_str()))
286}
287
288#[repr(transparent)]
293struct BodyRow(Row);
294
295impl BodyRow {
296 fn inner_mut(&mut self) -> &mut Row {
298 &mut self.0
299 }
300
301 fn into_inner(self) -> Row {
303 self.0
304 }
305}
306
307#[derive(Error, Debug)]
315pub enum WebhookError {
316 #[error("no object was found at the path {}", .0.quoted())]
317 NotFound(String),
318 #[error("the required auth could not be found")]
319 SecretMissing,
320 #[error("headers of request were invalid: {0}")]
321 InvalidHeaders(String),
322 #[error("failed to deserialize body as {ty:?}: {msg}")]
323 InvalidBody { ty: ScalarType, msg: String },
324 #[error("failed to validate the request")]
325 ValidationFailed,
326 #[error("error occurred while running validation")]
327 ValidationError,
328 #[error("service unavailable")]
329 Unavailable,
330 #[error("internal storage failure! {0:?}")]
331 InternalStorageError(StorageError<Timestamp>),
332 #[error("internal failure! {0:?}")]
333 Internal(#[from] anyhow::Error),
334}
335
336impl From<AppendWebhookError> for WebhookError {
337 fn from(err: AppendWebhookError) -> Self {
338 match err {
339 AppendWebhookError::MissingSecret => WebhookError::SecretMissing,
340 AppendWebhookError::ValidationError => WebhookError::ValidationError,
341 AppendWebhookError::InvalidUtf8Body { msg } => WebhookError::InvalidBody {
342 ty: ScalarType::String,
343 msg,
344 },
345 AppendWebhookError::InvalidJsonBody { msg } => WebhookError::InvalidBody {
346 ty: ScalarType::Jsonb,
347 msg,
348 },
349 AppendWebhookError::UnknownWebhook {
350 database,
351 schema,
352 name,
353 } => WebhookError::NotFound(format!("'{database}.{schema}.{name}'")),
354 AppendWebhookError::ValidationFailed => WebhookError::ValidationFailed,
355 AppendWebhookError::ChannelClosed => {
356 WebhookError::Internal(anyhow::anyhow!("channel closed"))
357 }
358 AppendWebhookError::StorageError(storage_err) => {
359 match storage_err {
360 StorageError::IdentifierMissing(id) | StorageError::IdentifierInvalid(id) => {
362 WebhookError::NotFound(id.to_string())
363 }
364 StorageError::ShuttingDown(_) => WebhookError::Unavailable,
365 e => WebhookError::InternalStorageError(e),
366 }
367 }
368 AppendWebhookError::InternalError(err) => WebhookError::Internal(err),
369 }
370 }
371}
372
373impl IntoResponse for WebhookError {
374 fn into_response(self) -> axum::response::Response {
375 match self {
376 e @ WebhookError::NotFound(_) | e @ WebhookError::SecretMissing => {
377 (StatusCode::NOT_FOUND, e.to_string()).into_response()
378 }
379 e @ WebhookError::InvalidBody { .. }
380 | e @ WebhookError::ValidationFailed
381 | e @ WebhookError::ValidationError => {
382 (StatusCode::BAD_REQUEST, e.to_string()).into_response()
383 }
384 e @ WebhookError::InvalidHeaders(_) => {
385 (StatusCode::UNAUTHORIZED, e.to_string()).into_response()
386 }
387 e @ WebhookError::Unavailable => {
388 (StatusCode::SERVICE_UNAVAILABLE, e.to_string()).into_response()
389 }
390 e @ WebhookError::InternalStorageError(StorageError::ResourceExhausted(_)) => {
391 (StatusCode::TOO_MANY_REQUESTS, e.to_string()).into_response()
392 }
393 e @ WebhookError::InternalStorageError(_) => {
394 (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
395 }
396 WebhookError::Internal(e) => (
397 StatusCode::INTERNAL_SERVER_ERROR,
398 e.root_cause().to_string(),
399 )
400 .into_response(),
401 }
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use std::collections::{BTreeMap, BTreeSet};
408
409 use axum::response::IntoResponse;
410 use bytes::Bytes;
411 use http::StatusCode;
412 use mz_adapter::AppendWebhookError;
413 use mz_ore::assert_none;
414 use mz_repr::{GlobalId, Row};
415 use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
416 use mz_storage_types::controller::StorageError;
417 use proptest::prelude::*;
418 use proptest::strategy::Union;
419
420 use super::{WebhookError, filter_headers, pack_rows};
421
422 fn arbitrary_json() -> impl Strategy<Value = serde_json::Value> {
424 let json_leaf = Union::new(vec![
425 any::<()>().prop_map(|_| serde_json::Value::Null).boxed(),
426 any::<bool>().prop_map(serde_json::Value::Bool).boxed(),
427 any::<i64>()
428 .prop_map(|x| serde_json::Value::Number(x.into()))
429 .boxed(),
430 any::<String>().prop_map(serde_json::Value::String).boxed(),
431 ]);
432
433 json_leaf.prop_recursive(4, 32, 8, |element| {
434 Union::new(vec![
435 prop::collection::vec(element.clone(), 0..16)
436 .prop_map(serde_json::Value::Array)
437 .boxed(),
438 prop::collection::hash_map(".*", element, 0..16)
439 .prop_map(|map| serde_json::Value::Object(map.into_iter().collect()))
440 .boxed(),
441 ])
442 })
443 }
444
445 #[track_caller]
446 fn check_rows(rows: &Vec<(Row, mz_repr::Diff)>, expected_rows: usize, expected_cols: usize) {
447 assert_eq!(rows.len(), expected_rows);
448 for (row, _diff) in rows {
449 assert_eq!(row.unpack().len(), expected_cols);
450 }
451 }
452
453 #[mz_ore::test]
454 fn smoke_test_storage_error_response_status() {
455 let resp = WebhookError::from(AppendWebhookError::StorageError(
457 StorageError::ResourceExhausted("test"),
458 ))
459 .into_response();
460 assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
461
462 let resp = WebhookError::from(AppendWebhookError::StorageError(
464 StorageError::IdentifierMissing(GlobalId::User(42)),
465 ))
466 .into_response();
467 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
468 }
469
470 #[mz_ore::test]
471 fn smoke_test_filter_headers() {
472 let block = BTreeSet::from(["foo".to_string()]);
473 let allow = BTreeSet::from(["bar".to_string()]);
474
475 let headers = BTreeMap::from([
476 ("foo".to_string(), "1".to_string()),
477 ("bar".to_string(), "2".to_string()),
478 ("baz".to_string(), "3".to_string()),
479 ]);
480 let mut filters = WebhookHeaderFilters::default();
481 filters.block.clone_from(&block);
482
483 let mut h = filter_headers(&headers, &filters);
484 assert_eq!(h.next().unwrap().0, "bar");
485 assert_eq!(h.next().unwrap().0, "baz");
486 assert_none!(h.next());
487
488 let mut filters = WebhookHeaderFilters::default();
489 filters.allow.clone_from(&allow);
490
491 let mut h = filter_headers(&headers, &filters);
492 assert_eq!(h.next().unwrap().0, "bar");
493 assert_none!(h.next());
494
495 let mut filters = WebhookHeaderFilters::default();
496 filters.allow = allow;
497 filters.block = block;
498
499 let mut h = filter_headers(&headers, &filters);
500 assert_eq!(h.next().unwrap().0, "bar");
501 assert_none!(h.next());
502 }
503
504 #[mz_ore::test]
505 fn filter_headers_block_overrides_allow() {
506 let block = BTreeSet::from(["foo".to_string()]);
507 let allow = block.clone();
508
509 let headers = BTreeMap::from([
510 ("foo".to_string(), "1".to_string()),
511 ("bar".to_string(), "2".to_string()),
512 ("baz".to_string(), "3".to_string()),
513 ]);
514 let filters = WebhookHeaderFilters { block, allow };
515
516 let mut h = filter_headers(&headers, &filters);
518 assert_none!(h.next());
519 }
520
521 #[mz_ore::test]
522 fn test_json_array_single() {
523 let single_raw = r#"
524 {
525 "event_type": "i am a single object",
526 "another_field": 42
527 }
528 "#;
529
530 let rows = pack_rows(
532 single_raw.as_bytes(),
533 &WebhookBodyFormat::Json { array: false },
534 &BTreeMap::default(),
535 &WebhookHeaders::default(),
536 )
537 .unwrap();
538 assert_eq!(rows.len(), 1);
539
540 let rows = pack_rows(
542 single_raw.as_bytes(),
543 &WebhookBodyFormat::Json { array: true },
544 &BTreeMap::default(),
545 &WebhookHeaders::default(),
546 )
547 .unwrap();
548 assert_eq!(rows.len(), 1);
549 }
550
551 #[mz_ore::test]
552 fn test_json_deserializer_multi() {
553 let multi_raw = r#"
554 [
555 { "event_type": "smol" },
556 { "event_type": "dog" }
557 ]
558 "#;
559
560 let rows = pack_rows(
561 multi_raw.as_bytes(),
562 &WebhookBodyFormat::Json { array: false },
563 &BTreeMap::default(),
564 &WebhookHeaders::default(),
565 )
566 .unwrap();
567 assert_eq!(rows.len(), 1);
569
570 let rows = pack_rows(
571 multi_raw.as_bytes(),
572 &WebhookBodyFormat::Json { array: true },
573 &BTreeMap::default(),
574 &WebhookHeaders::default(),
575 )
576 .unwrap();
577 assert_eq!(rows.len(), 2);
579 }
580
581 proptest! {
582 #[mz_ore::test]
583 fn proptest_pack_row_never_panics(
584 body: Vec<u8>,
585 body_ty: WebhookBodyFormat,
586 headers: BTreeMap<String, String>,
587 non_existent_headers: Vec<String>,
588 block: BTreeSet<String>,
589 allow: BTreeSet<String>,
590 ) {
591 let body = Bytes::from(body);
592
593 let filters = WebhookHeaderFilters { block, allow };
595 let mut use_bytes = false;
597 let mapped_headers = headers
598 .keys()
599 .take(headers.len() / 2)
600 .chain(non_existent_headers.iter())
601 .cloned()
602 .enumerate()
603 .map(|(idx, name)| {
604 use_bytes = !use_bytes;
605 (idx + 2, (name, use_bytes))
606 })
607 .collect();
608 let header_tys = WebhookHeaders {
609 header_column: Some(filters),
610 mapped_headers,
611 };
612
613 let _ = pack_rows(&body[..], &body_ty, &headers, &header_tys);
615 }
616
617 #[mz_ore::test]
618 fn proptest_pack_row_succeeds_for_bytes(
619 body: Vec<u8>,
620 headers: BTreeMap<String, String>,
621 include_headers: bool,
622 ) {
623 let body = Bytes::from(body);
624
625 let body_ty = WebhookBodyFormat::Bytes;
626 let mut header_tys = WebhookHeaders::default();
627 header_tys.header_column = include_headers.then(Default::default);
628
629 let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
630 check_rows(&rows, 1, header_tys.num_columns() + 1);
631 }
632
633 #[mz_ore::test]
634 fn proptest_pack_row_succeeds_for_strings(
635 body: String,
636 headers: BTreeMap<String, String>,
637 include_headers: bool,
638 ) {
639 let body = Bytes::from(body);
640
641 let body_ty = WebhookBodyFormat::Text;
642 let mut header_tys = WebhookHeaders::default();
643 header_tys.header_column = include_headers.then(Default::default);
644
645 let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
646 check_rows(&rows, 1, header_tys.num_columns() + 1);
647 }
648
649 #[mz_ore::test]
650 fn proptest_pack_row_succeeds_for_selective_headers(
651 body: String,
652 headers: BTreeMap<String, String>,
653 include_headers: bool,
654 non_existent_headers: Vec<String>,
655 block: BTreeSet<String>,
656 allow: BTreeSet<String>,
657 ) {
658 let body = Bytes::from(body);
659 let body_ty = WebhookBodyFormat::Text;
660
661 let filters = WebhookHeaderFilters { block, allow };
663 let mut use_bytes = false;
665 let column_offset = if include_headers { 2 } else { 1 };
666 let mapped_headers = headers
667 .keys()
668 .take(headers.len() / 2)
669 .chain(non_existent_headers.iter())
670 .cloned()
671 .enumerate()
672 .map(|(idx, name)| {
673 use_bytes = !use_bytes;
674 (idx + column_offset, (name, use_bytes))
675 })
676 .collect();
677 let header_tys = WebhookHeaders {
678 header_column: include_headers.then_some(filters),
679 mapped_headers,
680 };
681
682 let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
683 check_rows(&rows, 1, header_tys.num_columns() + 1);
684 }
685
686 #[mz_ore::test]
687 fn proptest_pack_json_with_array_expansion(
688 body in arbitrary_json(),
689 expand_array: bool,
690 headers: BTreeMap<String, String>,
691 include_headers: bool,
692 ) {
693 let json_raw = serde_json::to_vec(&body).unwrap();
694 let mut header_tys = WebhookHeaders::default();
695 header_tys.header_column = include_headers.then(Default::default);
696
697 let rows = pack_rows(
698 &json_raw[..],
699 &WebhookBodyFormat::Json { array: expand_array },
700 &headers,
701 &header_tys,
702 )
703 .unwrap();
704
705 let expected_num_rows = match body {
706 serde_json::Value::Array(inner) if expand_array => inner.len(),
707 _ => 1,
708 };
709 check_rows(&rows, expected_num_rows, header_tys.num_columns() + 1);
710 }
711 }
712}