mz_adapter/
webhook.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use anyhow::Context;
15use chrono::{DateTime, Utc};
16use derivative::Derivative;
17use mz_ore::cast::CastFrom;
18use mz_repr::{Datum, Diff, Row, RowArena, Timestamp};
19use mz_secrets::SecretsReader;
20use mz_secrets::cache::CachingSecretsReader;
21use mz_sql::plan::{WebhookBodyFormat, WebhookHeaders, WebhookValidation, WebhookValidationSecret};
22use mz_storage_client::controller::MonotonicAppender;
23use mz_storage_client::statistics::WebhookStatistics;
24use mz_storage_types::controller::StorageError;
25use tokio::sync::Semaphore;
26
27use crate::optimize::dataflows::{ExprPrepStyle, prep_scalar_expr};
28
29/// Errors returns when attempting to append to a webhook.
30#[derive(thiserror::Error, Debug)]
31pub enum AppendWebhookError {
32    // A secret that we need for validation has gone missing.
33    #[error("could not read a required secret")]
34    MissingSecret,
35    #[error("the provided request body is not UTF-8: {msg}")]
36    InvalidUtf8Body { msg: String },
37    #[error("the provided request body is not valid JSON: {msg}")]
38    InvalidJsonBody { msg: String },
39    #[error("webhook source '{database}.{schema}.{name}' does not exist")]
40    UnknownWebhook {
41        database: String,
42        schema: String,
43        name: String,
44    },
45    #[error("failed to validate the request")]
46    ValidationFailed,
47    // Note: we should _NEVER_ add more detail to this error, including the actual error we got
48    // when running validation. This is because the error messages might contain info about the
49    // arguments provided to the validation expression, we could contains user SECRETs. So by
50    // including any more detail we might accidentally expose SECRETs.
51    #[error("validation error")]
52    ValidationError,
53    #[error("internal channel closed")]
54    ChannelClosed,
55    #[error("internal error: {0:?}")]
56    InternalError(#[from] anyhow::Error),
57    #[error("internal storage failure! {0:?}")]
58    StorageError(#[from] StorageError<mz_repr::Timestamp>),
59}
60
61/// Contains all of the components necessary for running webhook validation.
62///
63/// To actually validate a webhook request call [`AppendWebhookValidator::eval`].
64#[derive(Clone)]
65pub struct AppendWebhookValidator {
66    validation: WebhookValidation,
67    secrets_reader: CachingSecretsReader,
68}
69
70impl AppendWebhookValidator {
71    pub fn new(validation: WebhookValidation, secrets_reader: CachingSecretsReader) -> Self {
72        AppendWebhookValidator {
73            validation,
74            secrets_reader,
75        }
76    }
77
78    pub async fn eval(
79        self,
80        body: bytes::Bytes,
81        headers: Arc<BTreeMap<String, String>>,
82        received_at: DateTime<Utc>,
83    ) -> Result<bool, AppendWebhookError> {
84        let AppendWebhookValidator {
85            validation,
86            secrets_reader,
87        } = self;
88
89        let WebhookValidation {
90            mut expression,
91            relation_desc: _,
92            secrets,
93            bodies: body_columns,
94            headers: header_columns,
95        } = validation;
96
97        // Use the secrets reader to get any secrets.
98        let mut secret_contents = BTreeMap::new();
99        for WebhookValidationSecret {
100            id,
101            column_idx,
102            use_bytes,
103        } in secrets
104        {
105            let secret = secrets_reader
106                .read(id)
107                .await
108                .map_err(|_| AppendWebhookError::MissingSecret)?;
109            secret_contents.insert(column_idx, (secret, use_bytes));
110        }
111
112        // Transform any calls to `now()` into a constant representing of the current time.
113        //
114        // Note: we do this outside the closure, because otherwise there are some odd catch unwind
115        // boundary errors, and this shouldn't be too computationally expensive.
116        prep_scalar_expr(
117            &mut expression,
118            ExprPrepStyle::WebhookValidation { now: received_at },
119        )
120        .map_err(|err| {
121            tracing::error!(?err, "failed to evaluate current time");
122            AppendWebhookError::ValidationError
123        })?;
124
125        // Create a closure to run our validation, this allows lifetimes and unwind boundaries to
126        // work.
127        let validate = move || {
128            // Gather our Datums for evaluation
129            //
130            // TODO(parkmycar): Re-use the RowArena when we implement rate limiting.
131            let temp_storage = RowArena::default();
132            let mut datums = Vec::with_capacity(
133                body_columns.len() + header_columns.len() + secret_contents.len(),
134            );
135
136            // Append all of our body columns.
137            for (column_idx, use_bytes) in body_columns {
138                assert_eq!(column_idx, datums.len(), "body index and datums mismatch!");
139
140                let datum = if use_bytes {
141                    Datum::Bytes(&body[..])
142                } else {
143                    let s = std::str::from_utf8(&body[..])
144                        .map_err(|m| AppendWebhookError::InvalidUtf8Body { msg: m.to_string() })?;
145                    Datum::String(s)
146                };
147                datums.push(datum);
148            }
149
150            // Append all of our header columns, re-using Row packings.
151            //
152            let headers_byte = std::cell::OnceCell::new();
153            let headers_text = std::cell::OnceCell::new();
154            for (column_idx, use_bytes) in header_columns {
155                assert_eq!(column_idx, datums.len(), "index and datums mismatch!");
156
157                let row = if use_bytes {
158                    headers_byte.get_or_init(|| {
159                        let mut row = Row::with_capacity(1);
160                        let mut packer = row.packer();
161                        packer.push_dict(
162                            headers
163                                .iter()
164                                .map(|(name, val)| (name.as_str(), Datum::Bytes(val.as_bytes()))),
165                        );
166                        row
167                    })
168                } else {
169                    headers_text.get_or_init(|| {
170                        let mut row = Row::with_capacity(1);
171                        let mut packer = row.packer();
172                        packer.push_dict(
173                            headers
174                                .iter()
175                                .map(|(name, val)| (name.as_str(), Datum::String(val))),
176                        );
177                        row
178                    })
179                };
180                datums.push(row.unpack_first());
181            }
182
183            // Append all of our secrets to our datums, in the correct column order.
184            for column_idx in datums.len()..datums.len() + secret_contents.len() {
185                // Get the secret that corresponds with what is the next "column";
186                let (secret, use_bytes) = secret_contents
187                    .get(&column_idx)
188                    .expect("more secrets to provide, but none for the next column");
189
190                if *use_bytes {
191                    datums.push(Datum::Bytes(secret));
192                } else {
193                    let secret_str = std::str::from_utf8(&secret[..]).expect("valid UTF-8");
194                    datums.push(Datum::String(secret_str));
195                }
196            }
197
198            // Run our validation
199            let valid = expression
200                .eval(&datums[..], &temp_storage)
201                .map_err(|_| AppendWebhookError::ValidationError)?;
202            match valid {
203                Datum::True => Ok::<_, AppendWebhookError>(true),
204                Datum::False | Datum::Null => Ok(false),
205                _ => unreachable!("Creating a webhook source asserts we return a boolean"),
206            }
207        };
208
209        // Then run the validation itself.
210        let valid = mz_ore::task::spawn_blocking(
211            || "webhook-validator-expr",
212            move || {
213                // Since the validation expression is technically a user defined function, we want to
214                // be extra careful and guard against issues taking down the entire process.
215                mz_ore::panic::catch_unwind(validate).map_err(|_| {
216                    tracing::error!("panic while validating webhook request!");
217                    AppendWebhookError::ValidationError
218                })
219            },
220        )
221        .await
222        .context("joining on validation")
223        .map_err(|e| {
224            tracing::error!("Failed to run validation for webhook, {e}");
225            AppendWebhookError::ValidationError
226        })??;
227
228        valid
229    }
230}
231
232#[derive(Derivative, Clone)]
233#[derivative(Debug)]
234pub struct AppendWebhookResponse {
235    /// Channel to monotonically append rows to a webhook source.
236    pub tx: WebhookAppender,
237    /// Column type for the `body` column.
238    pub body_format: WebhookBodyFormat,
239    /// Types of the columns for the headers of a request.
240    pub header_tys: WebhookHeaders,
241    /// Expression used to validate a webhook request.
242    #[derivative(Debug = "ignore")]
243    pub validator: Option<AppendWebhookValidator>,
244}
245
246/// A wrapper around [`MonotonicAppender`] that can get closed by the `Coordinator` if the webhook
247/// gets modified.
248#[derive(Clone, Debug)]
249pub struct WebhookAppender {
250    tx: MonotonicAppender<Timestamp>,
251    guard: WebhookAppenderGuard,
252    // Shared statistics related to this webhook.
253    stats: Arc<WebhookStatistics>,
254}
255
256impl WebhookAppender {
257    /// Checks if the [`WebhookAppender`] has closed.
258    pub fn is_closed(&self) -> bool {
259        self.guard.is_closed()
260    }
261
262    /// Appends updates to the linked webhook source.
263    pub async fn append(&self, updates: Vec<(Row, Diff)>) -> Result<(), AppendWebhookError> {
264        if self.is_closed() {
265            return Err(AppendWebhookError::ChannelClosed);
266        }
267
268        let count = u64::cast_from(updates.len());
269        self.stats
270            .updates_staged
271            .fetch_add(count, Ordering::Relaxed);
272        let updates = updates.into_iter().map(|update| update.into()).collect();
273        self.tx.append(updates).await?;
274        self.stats
275            .updates_committed
276            .fetch_add(count, Ordering::Relaxed);
277        Ok(())
278    }
279
280    /// Increment the `messages_received` user-facing statistics. This
281    /// should be incremented even if the request is invalid.
282    pub fn increment_messages_received(&self, msgs: u64) {
283        self.stats
284            .messages_received
285            .fetch_add(msgs, Ordering::Relaxed);
286    }
287
288    /// Increment the `bytes_received` user-facing statistics. This
289    /// should be incremented even if the request is invalid.
290    pub fn increment_bytes_received(&self, bytes: u64) {
291        self.stats
292            .bytes_received
293            .fetch_add(bytes, Ordering::Relaxed);
294    }
295
296    pub(crate) fn new(
297        tx: MonotonicAppender<Timestamp>,
298        guard: WebhookAppenderGuard,
299        stats: Arc<WebhookStatistics>,
300    ) -> Self {
301        WebhookAppender { tx, guard, stats }
302    }
303}
304
305/// When a webhook, or it's containing schema and database, get modified we need to invalidate any
306/// outstanding [`WebhookAppender`]s. This is because `Adapter`s will cache [`WebhookAppender`]s to
307/// increase performance, and the (database, schema, name) tuple they cached an appender for is now
308/// incorrect.
309#[derive(Clone, Debug)]
310pub struct WebhookAppenderGuard {
311    is_closed: Arc<AtomicBool>,
312}
313
314impl WebhookAppenderGuard {
315    pub fn is_closed(&self) -> bool {
316        self.is_closed.load(Ordering::SeqCst)
317    }
318}
319
320/// A handle to invalidate [`WebhookAppender`]s. See the comment on [`WebhookAppenderGuard`] for
321/// more detail.
322///
323/// Note: to invalidate the associated [`WebhookAppender`]s, you must drop the corresponding
324/// [`WebhookAppenderInvalidator`].
325#[derive(Debug)]
326pub struct WebhookAppenderInvalidator {
327    is_closed: Arc<AtomicBool>,
328}
329// We want to enforce unique ownership over the ability to invalidate a `WebhookAppender`.
330static_assertions::assert_not_impl_all!(WebhookAppenderInvalidator: Clone);
331
332impl WebhookAppenderInvalidator {
333    pub(crate) fn new() -> WebhookAppenderInvalidator {
334        let is_closed = Arc::new(AtomicBool::new(false));
335        WebhookAppenderInvalidator { is_closed }
336    }
337
338    pub fn guard(&self) -> WebhookAppenderGuard {
339        WebhookAppenderGuard {
340            is_closed: Arc::clone(&self.is_closed),
341        }
342    }
343}
344
345impl Drop for WebhookAppenderInvalidator {
346    fn drop(&mut self) {
347        self.is_closed.store(true, Ordering::SeqCst);
348    }
349}
350
351pub type WebhookAppenderName = (String, String, String);
352
353/// A cache of [`WebhookAppender`]s and other metadata required for appending to a wbhook source.
354///
355/// Entries in the cache get invalidated when a [`WebhookAppender`] closes, at which point the
356/// entry should be dropped from the cache and a request made to the `Coordinator` for a new one.
357#[derive(Debug, Clone)]
358pub struct WebhookAppenderCache {
359    pub entries: Arc<tokio::sync::Mutex<BTreeMap<WebhookAppenderName, AppendWebhookResponse>>>,
360}
361
362impl WebhookAppenderCache {
363    pub fn new() -> Self {
364        WebhookAppenderCache {
365            entries: Arc::new(tokio::sync::Mutex::new(BTreeMap::new())),
366        }
367    }
368}
369
370/// Manages how many concurrent webhook requests we allow at once.
371#[derive(Debug, Clone)]
372pub struct WebhookConcurrencyLimiter {
373    semaphore: Arc<Semaphore>,
374    prev_limit: usize,
375}
376
377impl WebhookConcurrencyLimiter {
378    pub fn new(limit: usize) -> Self {
379        let semaphore = Arc::new(Semaphore::new(limit));
380
381        WebhookConcurrencyLimiter {
382            semaphore,
383            prev_limit: limit,
384        }
385    }
386
387    /// Returns the underlying [`Semaphore`] used for limiting.
388    pub fn semaphore(&self) -> Arc<Semaphore> {
389        Arc::clone(&self.semaphore)
390    }
391
392    /// Updates the limit of how many concurrent requests can be run at once.
393    pub fn set_limit(&mut self, new_limit: usize) {
394        if new_limit > self.prev_limit {
395            // Add permits.
396            let diff = new_limit.saturating_sub(self.prev_limit);
397            tracing::debug!("Adding {diff} permits");
398
399            self.semaphore.add_permits(diff);
400        } else if new_limit < self.prev_limit {
401            // Remove permits.
402            let diff = self.prev_limit.saturating_sub(new_limit);
403            let diff = u32::try_from(diff).unwrap_or(u32::MAX);
404            tracing::debug!("Removing {diff} permits");
405
406            let semaphore = self.semaphore();
407
408            // Kind of janky, but the recommended way to reduce the amount of permits is to spawn
409            // a task the acquires and then forgets old permits.
410            mz_ore::task::spawn(|| "webhook-concurrency-limiter-drop-permits", async move {
411                if let Ok(permit) = Semaphore::acquire_many_owned(semaphore, diff).await {
412                    permit.forget()
413                }
414            });
415        }
416
417        // Store our new limit.
418        self.prev_limit = new_limit;
419        tracing::debug!("New limit, {} permits", self.prev_limit);
420    }
421}
422
423impl Default for WebhookConcurrencyLimiter {
424    fn default() -> Self {
425        WebhookConcurrencyLimiter::new(mz_sql::WEBHOOK_CONCURRENCY_LIMIT)
426    }
427}
428
429#[cfg(test)]
430mod test {
431    use mz_ore::assert_err;
432
433    use super::WebhookConcurrencyLimiter;
434
435    #[mz_ore::test(tokio::test)]
436    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
437    async fn smoke_test_concurrency_limiter() {
438        let mut limiter = WebhookConcurrencyLimiter::new(10);
439
440        let semaphore_a = limiter.semaphore();
441        let _permit_a = semaphore_a.try_acquire_many(10).expect("acquire");
442
443        let semaphore_b = limiter.semaphore();
444        assert_err!(semaphore_b.try_acquire());
445
446        // Increase our limit.
447        limiter.set_limit(15);
448
449        // This should now succeed!
450        let _permit_b = semaphore_b.try_acquire().expect("acquire");
451
452        // Decrease our limit.
453        limiter.set_limit(5);
454
455        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
456
457        // This should fail again.
458        assert_err!(semaphore_b.try_acquire());
459    }
460}