mz_adapter/coord/privatelink_status.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::num::NonZeroU32;
11use std::sync::Arc;
12
13use governor::{Quota, RateLimiter};
14
15use mz_ore::future::OreStreamExt;
16use mz_ore::task::spawn;
17
18use crate::coord::Coordinator;
19
20use super::Message;
21
22impl Coordinator {
23 pub(crate) fn spawn_privatelink_vpc_endpoints_watch_task(&self) {
24 let internal_cmd_tx = self.internal_cmd_tx.clone();
25 let rate_quota: u32 = self
26 .catalog
27 .system_config()
28 .privatelink_status_update_quota_per_minute();
29
30 if let Some(controller) = &self.cloud_resource_controller {
31 let controller = Arc::clone(controller);
32 spawn(|| "privatelink_vpc_endpoint_watch", async move {
33 let mut stream = controller.watch_vpc_endpoints().await;
34 // Using a per-minute quota implies a burst-size of the same amount
35 let rate_limiter = RateLimiter::direct(Quota::per_minute(
36 NonZeroU32::new(rate_quota).expect("will be non-zero"),
37 ));
38
39 loop {
40 // Wait for events to become available
41 if let Some(new_events) = stream.recv_many(20).await {
42 // Wait until we're permitted to tell the coordinator about the events
43 // Note that the stream is backed by a https://docs.rs/kube/latest/kube/runtime/fn.watcher.html,
44 // which means its safe for us to rate limit for an arbitrarily long time and expect the stream
45 // to continue to work, despite not being polled
46 rate_limiter.until_ready().await;
47
48 // Send the event batch to the coordinator to be written
49 let _ =
50 internal_cmd_tx.send(Message::PrivateLinkVpcEndpointEvents(new_events));
51 }
52 }
53 });
54 }
55 }
56}