mz_adapter/coord/
privatelink_status.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::num::NonZeroU32;
11use std::sync::Arc;
12
13use governor::{Quota, RateLimiter};
14
15use mz_ore::future::OreStreamExt;
16use mz_ore::task::spawn;
17
18use crate::coord::Coordinator;
19
20use super::Message;
21
22impl Coordinator {
23    pub(crate) fn spawn_privatelink_vpc_endpoints_watch_task(&self) {
24        let internal_cmd_tx = self.internal_cmd_tx.clone();
25        let rate_quota: u32 = self
26            .catalog
27            .system_config()
28            .privatelink_status_update_quota_per_minute();
29
30        if let Some(controller) = &self.cloud_resource_controller {
31            let controller = Arc::clone(controller);
32            spawn(|| "privatelink_vpc_endpoint_watch", async move {
33                let mut stream = controller.watch_vpc_endpoints().await;
34                // Using a per-minute quota implies a burst-size of the same amount
35                let rate_limiter = RateLimiter::direct(Quota::per_minute(
36                    NonZeroU32::new(rate_quota).expect("will be non-zero"),
37                ));
38
39                loop {
40                    // Wait for events to become available
41                    if let Some(new_events) = stream.recv_many(20).await {
42                        // Wait until we're permitted to tell the coordinator about the events
43                        // Note that the stream is backed by a https://docs.rs/kube/latest/kube/runtime/fn.watcher.html,
44                        // which means its safe for us to rate limit for an arbitrarily long time and expect the stream
45                        // to continue to work, despite not being polled
46                        rate_limiter.until_ready().await;
47
48                        // Send the event batch to the coordinator to be written
49                        let _ =
50                            internal_cmd_tx.send(Message::PrivateLinkVpcEndpointEvents(new_events));
51                    }
52                }
53            });
54        }
55    }
56}