mz_environmentd/http/
webhook.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Helpers for handling events from a Webhook source.
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14
15use mz_adapter::{AppendWebhookError, AppendWebhookResponse, WebhookAppenderCache};
16use mz_ore::cast::CastFrom;
17use mz_ore::retry::{Retry, RetryResult};
18use mz_ore::str::StrExt;
19use mz_repr::adt::jsonb::Jsonb;
20use mz_repr::{Datum, Diff, Row, RowPacker, ScalarType, Timestamp};
21use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
22use mz_storage_types::controller::StorageError;
23
24use axum::extract::{Path, State};
25use axum::response::IntoResponse;
26use bytes::Bytes;
27use http::StatusCode;
28use thiserror::Error;
29
30use crate::http::WebhookState;
31
32pub async fn handle_webhook(
33    State(WebhookState {
34        adapter_client_rx,
35        webhook_cache,
36    }): State<WebhookState>,
37    Path((database, schema, name)): Path<(String, String, String)>,
38    headers: http::HeaderMap,
39    body: Bytes,
40) -> impl IntoResponse {
41    let adapter_client = adapter_client_rx.clone().await.expect("sender not dropped");
42    // Collect headers into a map, while converting them into strings.
43    let mut headers_s = BTreeMap::new();
44    for (name, val) in headers.iter() {
45        if let Ok(val_s) = val.to_str().map(|s| s.to_string()) {
46            // If a header is included more than once, bail returning an error to the user.
47            let existing = headers_s.insert(name.as_str().to_string(), val_s);
48            if existing.is_some() {
49                let msg = format!("{} provided more than once", name.as_str());
50                return Err(WebhookError::InvalidHeaders(msg));
51            }
52        }
53    }
54    let headers = Arc::new(headers_s);
55
56    // Append to the webhook source, retrying if we race with a concurrent `ALTER SOURCE` op.
57    Retry::default()
58        .max_tries(2)
59        .retry_async(|_| async {
60            let result = append_webhook(
61                &adapter_client,
62                &webhook_cache,
63                &database,
64                &schema,
65                &name,
66                &body,
67                &headers,
68            )
69            .await;
70
71            // Note: think carefully before adding more errors here, we need to make sure we don't
72            // append data more than once.
73            match result {
74                Ok(()) => RetryResult::Ok(()),
75                Err(e @ AppendWebhookError::ChannelClosed) => RetryResult::RetryableErr(e),
76                Err(e) => RetryResult::FatalErr(e),
77            }
78        })
79        .await?;
80
81    Ok::<_, WebhookError>(())
82}
83
84/// Append the provided `body` and `headers` to the webhook source identified via `database`,
85/// `schema`, and `name`.
86async fn append_webhook(
87    adapter_client: &mz_adapter::Client,
88    webhook_cache: &WebhookAppenderCache,
89    database: &str,
90    schema: &str,
91    name: &str,
92    body: &Bytes,
93    headers: &Arc<BTreeMap<String, String>>,
94) -> Result<(), AppendWebhookError> {
95    // Shenanigans to get the types working for the async retry.
96    let (database, schema, name) = (database.to_string(), schema.to_string(), name.to_string());
97
98    // Record the time we receive the request, for use if validation checks the current timestamp.
99    let received_at = adapter_client.now();
100
101    // Get an appender for the provided object, if that object exists.
102    let AppendWebhookResponse {
103        tx,
104        body_format,
105        header_tys,
106        validator,
107    } = async {
108        let mut guard = webhook_cache.entries.lock().await;
109
110        // Remove the appender from our map, only re-insert it, if it's valid.
111        match guard.remove(&(database.clone(), schema.clone(), name.clone())) {
112            Some(appender) if !appender.tx.is_closed() => {
113                guard.insert((database, schema, name), appender.clone());
114                Ok::<_, AppendWebhookError>(appender)
115            }
116            // We don't have a valid appender, so we need to get one.
117            //
118            // Note: we hold the lock while we acquire and appender to prevent a dogpile.
119            _ => {
120                tracing::info!(?database, ?schema, ?name, "fetching webhook appender");
121                adapter_client.metrics().webhook_get_appender.inc();
122
123                // Acquire and cache a new appender.
124                let appender = adapter_client
125                    .get_webhook_appender(database.clone(), schema.clone(), name.clone())
126                    .await?;
127
128                guard.insert((database, schema, name), appender.clone());
129
130                Ok(appender)
131            }
132        }
133    }
134    .await?;
135
136    // These must happen before validation as we do not know if validation or
137    // packing will succeed and appending will begin
138    tx.increment_messages_received(1);
139    tx.increment_bytes_received(u64::cast_from(body.len()));
140
141    // If this source requires validation, then validate!
142    if let Some(validator) = validator {
143        let valid = validator
144            .eval(Bytes::clone(body), Arc::clone(headers), received_at)
145            .await?;
146        if !valid {
147            return Err(AppendWebhookError::ValidationFailed);
148        }
149    }
150
151    // Pack our body and headers into a Row.
152    let rows = pack_rows(body, &body_format, headers, &header_tys)?;
153
154    // Send the row to get appended.
155    tx.append(rows).await?;
156
157    Ok(())
158}
159
160/// Packs the body and headers of a webhook request into as many rows as necessary.
161///
162/// TODO(parkmycar): Should we be consolidating the returned Rows here? Presumably something in
163/// storage would already be doing it, so no need to do it twice?
164fn pack_rows(
165    body: &[u8],
166    body_format: &WebhookBodyFormat,
167    headers: &BTreeMap<String, String>,
168    header_tys: &WebhookHeaders,
169) -> Result<Vec<(Row, Diff)>, AppendWebhookError> {
170    // This method isn't that "deep" but it reflects the way we intend for the packing process to
171    // work and makes testing easier.
172    let rows = transform_body(body, body_format)?
173        .into_iter()
174        .map(|row| pack_header(row, headers, header_tys).map(|row| (row, Diff::ONE)))
175        .collect::<Result<_, _>>()?;
176    Ok(rows)
177}
178
179/// Transforms the body of a webhook request into a `Vec<BodyRow>`.
180fn transform_body(
181    body: &[u8],
182    format: &WebhookBodyFormat,
183) -> Result<Vec<BodyRow>, AppendWebhookError> {
184    let rows = match format {
185        WebhookBodyFormat::Bytes => {
186            vec![Row::pack_slice(&[Datum::Bytes(body)])]
187        }
188        WebhookBodyFormat::Text => {
189            let s = std::str::from_utf8(body)
190                .map_err(|m| AppendWebhookError::InvalidUtf8Body { msg: m.to_string() })?;
191            vec![Row::pack_slice(&[Datum::String(s)])]
192        }
193        WebhookBodyFormat::Json { array } => {
194            let objects = serde_json::Deserializer::from_slice(body)
195                // Automatically expand multiple JSON objects delimited by whitespace, e.g.
196                // newlines, into a single batch.
197                .into_iter::<serde_json::Value>()
198                // Optionally expand a JSON array into separate rows, if requested.
199                .flat_map(|value| match value {
200                    Ok(serde_json::Value::Array(inners)) if *array => {
201                        itertools::Either::Left(inners.into_iter().map(Result::Ok))
202                    }
203                    value => itertools::Either::Right(std::iter::once(value)),
204                })
205                .collect::<Result<Vec<_>, _>>()
206                .map_err(|m| AppendWebhookError::InvalidJsonBody { msg: m.to_string() })?;
207
208            // Note: `into_iter()` should be re-using the underlying allocation of the `objects`
209            // vector, and it's more readable to split these into separate iterators.
210            let rows = objects
211                .into_iter()
212                // Map a JSON object into a Row.
213                .map(|o| {
214                    let row = Jsonb::from_serde_json(o)
215                        .map_err(|m| AppendWebhookError::InvalidJsonBody { msg: m.to_string() })?
216                        .into_row();
217                    Ok::<_, AppendWebhookError>(row)
218                })
219                .collect::<Result<_, _>>()?;
220
221            rows
222        }
223    };
224
225    // A `Row` cannot describe its schema without unpacking it. To add some safety we wrap the
226    // returned `Row`s in a newtype to signify they already have the "body" column packed.
227    let body_rows = rows.into_iter().map(BodyRow).collect();
228
229    Ok(body_rows)
230}
231
232/// Pack the headers of a request into a [`Row`].
233fn pack_header(
234    mut body_row: BodyRow,
235    headers: &BTreeMap<String, String>,
236    header_tys: &WebhookHeaders,
237) -> Result<Row, AppendWebhookError> {
238    // 1 column for the body plus however many are needed for the headers.
239    let num_cols = 1 + header_tys.num_columns();
240    // The provided Row already has the Body written.
241    let mut num_cols_written = 1;
242
243    let mut packer = RowPacker::for_existing_row(body_row.inner_mut());
244
245    // Pack the headers into our row, if required.
246    if let Some(filters) = &header_tys.header_column {
247        packer.push_dict(
248            filter_headers(headers, filters).map(|(name, val)| (name, Datum::String(val))),
249        );
250        num_cols_written += 1;
251    }
252
253    // Pack the mapped headers.
254    for idx in num_cols_written..num_cols {
255        let (header_name, use_bytes) = header_tys
256            .mapped_headers
257            .get(&idx)
258            .ok_or_else(|| anyhow::anyhow!("Invalid header column index {idx}"))?;
259        let header = headers.get(header_name);
260        let datum = match header {
261            Some(h) if *use_bytes => Datum::Bytes(h.as_bytes()),
262            Some(h) => Datum::String(h),
263            None => Datum::Null,
264        };
265        packer.push(datum);
266    }
267
268    Ok(body_row.into_inner())
269}
270
271fn filter_headers<'a: 'b, 'b>(
272    headers: &'a BTreeMap<String, String>,
273    filters: &'b WebhookHeaderFilters,
274) -> impl Iterator<Item = (&'a str, &'a str)> + 'b {
275    headers
276        .iter()
277        .filter(|(header_name, _val)| {
278            // If our block list is empty, then don't filter anything.
279            filters.block.is_empty() || !filters.block.contains(*header_name)
280        })
281        .filter(|(header_name, _val)| {
282            // If our allow list is empty, then don't filter anything.
283            filters.allow.is_empty() || filters.allow.contains(*header_name)
284        })
285        .map(|(key, val)| (key.as_str(), val.as_str()))
286}
287
288/// A [`Row`] that has the body of a request already packed into it.
289///
290/// Note: if you're constructing a [`BodyRow`] you need to guarantee the only column packed into
291/// the [`Row`] is a single "body" column.
292#[repr(transparent)]
293struct BodyRow(Row);
294
295impl BodyRow {
296    /// Obtain a mutable reference to the inner [`Row`].
297    fn inner_mut(&mut self) -> &mut Row {
298        &mut self.0
299    }
300
301    /// Return the inner [`Row`].
302    fn into_inner(self) -> Row {
303        self.0
304    }
305}
306
307/// Errors we can encounter when appending data to a Webhook Source.
308///
309/// Webhook sources are a bit special since they are handled by `environmentd` (all other sources
310/// are handled by `clusterd`) and data is "pushed" to them (all other source pull data). The
311/// errors also generally need to map to HTTP status codes that we can use to respond to a webhook
312/// request. As such, webhook errors don't cleanly map to any existing error type, hence the
313/// existence of this error type.
314#[derive(Error, Debug)]
315pub enum WebhookError {
316    #[error("no object was found at the path {}", .0.quoted())]
317    NotFound(String),
318    #[error("the required auth could not be found")]
319    SecretMissing,
320    #[error("headers of request were invalid: {0}")]
321    InvalidHeaders(String),
322    #[error("failed to deserialize body as {ty:?}: {msg}")]
323    InvalidBody { ty: ScalarType, msg: String },
324    #[error("failed to validate the request")]
325    ValidationFailed,
326    #[error("error occurred while running validation")]
327    ValidationError,
328    #[error("service unavailable")]
329    Unavailable,
330    #[error("internal storage failure! {0:?}")]
331    InternalStorageError(StorageError<Timestamp>),
332    #[error("internal failure! {0:?}")]
333    Internal(#[from] anyhow::Error),
334}
335
336impl From<AppendWebhookError> for WebhookError {
337    fn from(err: AppendWebhookError) -> Self {
338        match err {
339            AppendWebhookError::MissingSecret => WebhookError::SecretMissing,
340            AppendWebhookError::ValidationError => WebhookError::ValidationError,
341            AppendWebhookError::InvalidUtf8Body { msg } => WebhookError::InvalidBody {
342                ty: ScalarType::String,
343                msg,
344            },
345            AppendWebhookError::InvalidJsonBody { msg } => WebhookError::InvalidBody {
346                ty: ScalarType::Jsonb,
347                msg,
348            },
349            AppendWebhookError::UnknownWebhook {
350                database,
351                schema,
352                name,
353            } => WebhookError::NotFound(format!("'{database}.{schema}.{name}'")),
354            AppendWebhookError::ValidationFailed => WebhookError::ValidationFailed,
355            AppendWebhookError::ChannelClosed => {
356                WebhookError::Internal(anyhow::anyhow!("channel closed"))
357            }
358            AppendWebhookError::StorageError(storage_err) => {
359                match storage_err {
360                    // TODO(parkmycar): Maybe map this to a HTTP 410 Gone instead of 404?
361                    StorageError::IdentifierMissing(id) | StorageError::IdentifierInvalid(id) => {
362                        WebhookError::NotFound(id.to_string())
363                    }
364                    StorageError::ShuttingDown(_) => WebhookError::Unavailable,
365                    e => WebhookError::InternalStorageError(e),
366                }
367            }
368            AppendWebhookError::InternalError(err) => WebhookError::Internal(err),
369        }
370    }
371}
372
373impl IntoResponse for WebhookError {
374    fn into_response(self) -> axum::response::Response {
375        match self {
376            e @ WebhookError::NotFound(_) | e @ WebhookError::SecretMissing => {
377                (StatusCode::NOT_FOUND, e.to_string()).into_response()
378            }
379            e @ WebhookError::InvalidBody { .. }
380            | e @ WebhookError::ValidationFailed
381            | e @ WebhookError::ValidationError => {
382                (StatusCode::BAD_REQUEST, e.to_string()).into_response()
383            }
384            e @ WebhookError::InvalidHeaders(_) => {
385                (StatusCode::UNAUTHORIZED, e.to_string()).into_response()
386            }
387            e @ WebhookError::Unavailable => {
388                (StatusCode::SERVICE_UNAVAILABLE, e.to_string()).into_response()
389            }
390            e @ WebhookError::InternalStorageError(StorageError::ResourceExhausted(_)) => {
391                (StatusCode::TOO_MANY_REQUESTS, e.to_string()).into_response()
392            }
393            e @ WebhookError::InternalStorageError(_) => {
394                (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
395            }
396            WebhookError::Internal(e) => (
397                StatusCode::INTERNAL_SERVER_ERROR,
398                e.root_cause().to_string(),
399            )
400                .into_response(),
401        }
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use std::collections::{BTreeMap, BTreeSet};
408
409    use axum::response::IntoResponse;
410    use bytes::Bytes;
411    use http::StatusCode;
412    use mz_adapter::AppendWebhookError;
413    use mz_ore::assert_none;
414    use mz_repr::{GlobalId, Row};
415    use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
416    use mz_storage_types::controller::StorageError;
417    use proptest::prelude::*;
418    use proptest::strategy::Union;
419
420    use super::{WebhookError, filter_headers, pack_rows};
421
422    // TODO(parkmycar): Move this strategy to `ore`?
423    fn arbitrary_json() -> impl Strategy<Value = serde_json::Value> {
424        let json_leaf = Union::new(vec![
425            any::<()>().prop_map(|_| serde_json::Value::Null).boxed(),
426            any::<bool>().prop_map(serde_json::Value::Bool).boxed(),
427            any::<i64>()
428                .prop_map(|x| serde_json::Value::Number(x.into()))
429                .boxed(),
430            any::<String>().prop_map(serde_json::Value::String).boxed(),
431        ]);
432
433        json_leaf.prop_recursive(4, 32, 8, |element| {
434            Union::new(vec![
435                prop::collection::vec(element.clone(), 0..16)
436                    .prop_map(serde_json::Value::Array)
437                    .boxed(),
438                prop::collection::hash_map(".*", element, 0..16)
439                    .prop_map(|map| serde_json::Value::Object(map.into_iter().collect()))
440                    .boxed(),
441            ])
442        })
443    }
444
445    #[track_caller]
446    fn check_rows(rows: &Vec<(Row, mz_repr::Diff)>, expected_rows: usize, expected_cols: usize) {
447        assert_eq!(rows.len(), expected_rows);
448        for (row, _diff) in rows {
449            assert_eq!(row.unpack().len(), expected_cols);
450        }
451    }
452
453    #[mz_ore::test]
454    fn smoke_test_storage_error_response_status() {
455        // Resource exhausted should get mapped to a specific status code.
456        let resp = WebhookError::from(AppendWebhookError::StorageError(
457            StorageError::ResourceExhausted("test"),
458        ))
459        .into_response();
460        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
461
462        // IdentifierMissing should also get mapped to a specific status code.
463        let resp = WebhookError::from(AppendWebhookError::StorageError(
464            StorageError::IdentifierMissing(GlobalId::User(42)),
465        ))
466        .into_response();
467        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
468    }
469
470    #[mz_ore::test]
471    fn smoke_test_filter_headers() {
472        let block = BTreeSet::from(["foo".to_string()]);
473        let allow = BTreeSet::from(["bar".to_string()]);
474
475        let headers = BTreeMap::from([
476            ("foo".to_string(), "1".to_string()),
477            ("bar".to_string(), "2".to_string()),
478            ("baz".to_string(), "3".to_string()),
479        ]);
480        let mut filters = WebhookHeaderFilters::default();
481        filters.block.clone_from(&block);
482
483        let mut h = filter_headers(&headers, &filters);
484        assert_eq!(h.next().unwrap().0, "bar");
485        assert_eq!(h.next().unwrap().0, "baz");
486        assert_none!(h.next());
487
488        let mut filters = WebhookHeaderFilters::default();
489        filters.allow.clone_from(&allow);
490
491        let mut h = filter_headers(&headers, &filters);
492        assert_eq!(h.next().unwrap().0, "bar");
493        assert_none!(h.next());
494
495        let mut filters = WebhookHeaderFilters::default();
496        filters.allow = allow;
497        filters.block = block;
498
499        let mut h = filter_headers(&headers, &filters);
500        assert_eq!(h.next().unwrap().0, "bar");
501        assert_none!(h.next());
502    }
503
504    #[mz_ore::test]
505    fn filter_headers_block_overrides_allow() {
506        let block = BTreeSet::from(["foo".to_string()]);
507        let allow = block.clone();
508
509        let headers = BTreeMap::from([
510            ("foo".to_string(), "1".to_string()),
511            ("bar".to_string(), "2".to_string()),
512            ("baz".to_string(), "3".to_string()),
513        ]);
514        let filters = WebhookHeaderFilters { block, allow };
515
516        // We should yield nothing since we block the only thing we allow.
517        let mut h = filter_headers(&headers, &filters);
518        assert_none!(h.next());
519    }
520
521    #[mz_ore::test]
522    fn test_json_array_single() {
523        let single_raw = r#"
524        {
525            "event_type": "i am a single object",
526            "another_field": 42
527        }
528        "#;
529
530        // We should get a single Row regardless of whether or not we're requested to expand.
531        let rows = pack_rows(
532            single_raw.as_bytes(),
533            &WebhookBodyFormat::Json { array: false },
534            &BTreeMap::default(),
535            &WebhookHeaders::default(),
536        )
537        .unwrap();
538        assert_eq!(rows.len(), 1);
539
540        // We should get a single Row regardless of whether or not we're requested to expand.
541        let rows = pack_rows(
542            single_raw.as_bytes(),
543            &WebhookBodyFormat::Json { array: true },
544            &BTreeMap::default(),
545            &WebhookHeaders::default(),
546        )
547        .unwrap();
548        assert_eq!(rows.len(), 1);
549    }
550
551    #[mz_ore::test]
552    fn test_json_deserializer_multi() {
553        let multi_raw = r#"
554            [
555                { "event_type": "smol" },
556                { "event_type": "dog" }
557            ]
558        "#;
559
560        let rows = pack_rows(
561            multi_raw.as_bytes(),
562            &WebhookBodyFormat::Json { array: false },
563            &BTreeMap::default(),
564            &WebhookHeaders::default(),
565        )
566        .unwrap();
567        // If we don't expand the body, we should have a single row.
568        assert_eq!(rows.len(), 1);
569
570        let rows = pack_rows(
571            multi_raw.as_bytes(),
572            &WebhookBodyFormat::Json { array: true },
573            &BTreeMap::default(),
574            &WebhookHeaders::default(),
575        )
576        .unwrap();
577        // If we _do_ expand the body, we should have a two rows.
578        assert_eq!(rows.len(), 2);
579    }
580
581    proptest! {
582        #[mz_ore::test]
583        fn proptest_pack_row_never_panics(
584            body: Vec<u8>,
585            body_ty: WebhookBodyFormat,
586            headers: BTreeMap<String, String>,
587            non_existent_headers: Vec<String>,
588            block: BTreeSet<String>,
589            allow: BTreeSet<String>,
590        ) {
591            let body = Bytes::from(body);
592
593            // Include the headers column with a random set of block and allow.
594            let filters = WebhookHeaderFilters { block, allow };
595            // Include half of the existing headers, append on some non-existing ones too.
596            let mut use_bytes = false;
597            let mapped_headers = headers
598                .keys()
599                .take(headers.len() / 2)
600                .chain(non_existent_headers.iter())
601                .cloned()
602                .enumerate()
603                .map(|(idx, name)| {
604                    use_bytes = !use_bytes;
605                    (idx + 2, (name, use_bytes))
606                })
607                .collect();
608            let header_tys = WebhookHeaders {
609                header_column: Some(filters),
610                mapped_headers,
611            };
612
613            // Call this method to make sure it doesn't panic.
614            let _ = pack_rows(&body[..], &body_ty, &headers, &header_tys);
615        }
616
617        #[mz_ore::test]
618        fn proptest_pack_row_succeeds_for_bytes(
619            body: Vec<u8>,
620            headers: BTreeMap<String, String>,
621            include_headers: bool,
622        ) {
623            let body = Bytes::from(body);
624
625            let body_ty = WebhookBodyFormat::Bytes;
626            let mut header_tys = WebhookHeaders::default();
627            header_tys.header_column = include_headers.then(Default::default);
628
629            let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
630            check_rows(&rows, 1, header_tys.num_columns() + 1);
631        }
632
633        #[mz_ore::test]
634        fn proptest_pack_row_succeeds_for_strings(
635            body: String,
636            headers: BTreeMap<String, String>,
637            include_headers: bool,
638        ) {
639            let body = Bytes::from(body);
640
641            let body_ty = WebhookBodyFormat::Text;
642            let mut header_tys = WebhookHeaders::default();
643            header_tys.header_column = include_headers.then(Default::default);
644
645            let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
646            check_rows(&rows, 1, header_tys.num_columns() + 1);
647        }
648
649        #[mz_ore::test]
650        fn proptest_pack_row_succeeds_for_selective_headers(
651            body: String,
652            headers: BTreeMap<String, String>,
653            include_headers: bool,
654            non_existent_headers: Vec<String>,
655            block: BTreeSet<String>,
656            allow: BTreeSet<String>,
657        ) {
658            let body = Bytes::from(body);
659            let body_ty = WebhookBodyFormat::Text;
660
661            // Include the headers column with a random set of block and allow.
662            let filters = WebhookHeaderFilters { block, allow };
663            // Include half of the existing headers, append on some non-existing ones too.
664            let mut use_bytes = false;
665            let column_offset = if include_headers { 2 } else { 1 };
666            let mapped_headers = headers
667                .keys()
668                .take(headers.len() / 2)
669                .chain(non_existent_headers.iter())
670                .cloned()
671                .enumerate()
672                .map(|(idx, name)| {
673                    use_bytes = !use_bytes;
674                    (idx + column_offset, (name, use_bytes))
675                })
676                .collect();
677            let header_tys = WebhookHeaders {
678                header_column: include_headers.then_some(filters),
679                mapped_headers,
680            };
681
682            let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
683            check_rows(&rows, 1, header_tys.num_columns() + 1);
684        }
685
686        #[mz_ore::test]
687        fn proptest_pack_json_with_array_expansion(
688            body in arbitrary_json(),
689            expand_array: bool,
690            headers: BTreeMap<String, String>,
691            include_headers: bool,
692        ) {
693            let json_raw = serde_json::to_vec(&body).unwrap();
694            let mut header_tys = WebhookHeaders::default();
695            header_tys.header_column = include_headers.then(Default::default);
696
697            let rows = pack_rows(
698                &json_raw[..],
699                &WebhookBodyFormat::Json { array: expand_array },
700                &headers,
701                &header_tys,
702            )
703            .unwrap();
704
705            let expected_num_rows = match body {
706                serde_json::Value::Array(inner) if expand_array => inner.len(),
707                _ => 1,
708            };
709            check_rows(&rows, expected_num_rows, header_tys.num_columns() + 1);
710        }
711    }
712}