1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::num::NonZeroU32;
use std::sync::Arc;

use governor::{Quota, RateLimiter};

use mz_ore::future::OreStreamExt;
use mz_ore::task::spawn;

use crate::coord::Coordinator;

use super::Message;

impl Coordinator {
    pub(crate) fn spawn_privatelink_vpc_endpoints_watch_task(&self) {
        let internal_cmd_tx = self.internal_cmd_tx.clone();
        let rate_quota: u32 = self
            .catalog
            .system_config()
            .privatelink_status_update_quota_per_minute();

        if let Some(controller) = &self.cloud_resource_controller {
            let controller = Arc::clone(controller);
            spawn(|| "privatelink_vpc_endpoint_watch", async move {
                let mut stream = controller.watch_vpc_endpoints().await;
                // Using a per-minute quota implies a burst-size of the same amount
                let rate_limiter = RateLimiter::direct(Quota::per_minute(
                    NonZeroU32::new(rate_quota).expect("will be non-zero"),
                ));

                loop {
                    // Wait for events to become available
                    if let Some(new_events) = stream.recv_many(20).await {
                        // Wait until we're permitted to tell the coordinator about the events
                        // Note that the stream is backed by a https://docs.rs/kube/latest/kube/runtime/fn.watcher.html,
                        // which means its safe for us to rate limit for an arbitrarily long time and expect the stream
                        // to continue to work, despite not being polled
                        rate_limiter.until_ready().await;

                        // Send the event batch to the coordinator to be written
                        let _ =
                            internal_cmd_tx.send(Message::PrivateLinkVpcEndpointEvents(new_events));
                    }
                }
            });
        }
    }
}