sentry/transports/
tokio_thread.rs

1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::mpsc::{sync_channel, SyncSender};
3use std::sync::Arc;
4use std::thread::{self, JoinHandle};
5use std::time::Duration;
6
7use super::ratelimit::{RateLimiter, RateLimitingCategory};
8use crate::{sentry_debug, Envelope};
9
10enum Task {
11    SendEnvelope(Envelope),
12    Flush(SyncSender<()>),
13    Shutdown,
14}
15
16pub struct TransportThread {
17    sender: SyncSender<Task>,
18    shutdown: Arc<AtomicBool>,
19    handle: Option<JoinHandle<()>>,
20}
21
22impl TransportThread {
23    pub fn new<SendFn, SendFuture>(mut send: SendFn) -> Self
24    where
25        SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static,
26        // NOTE: returning RateLimiter here, otherwise we are in borrow hell
27        SendFuture: std::future::Future<Output = RateLimiter>,
28    {
29        let (sender, receiver) = sync_channel(30);
30        let shutdown = Arc::new(AtomicBool::new(false));
31        let shutdown_worker = shutdown.clone();
32        let handle = thread::Builder::new()
33            .name("sentry-transport".into())
34            .spawn(move || {
35                // create a runtime on the transport thread
36                let rt = tokio::runtime::Builder::new_current_thread()
37                    .enable_all()
38                    .build()
39                    .unwrap();
40
41                let mut rl = RateLimiter::new();
42
43                // and block on an async fn in this runtime/thread
44                rt.block_on(async move {
45                    for task in receiver.into_iter() {
46                        if shutdown_worker.load(Ordering::SeqCst) {
47                            return;
48                        }
49                        let envelope = match task {
50                            Task::SendEnvelope(envelope) => envelope,
51                            Task::Flush(sender) => {
52                                sender.send(()).ok();
53                                continue;
54                            }
55                            Task::Shutdown => {
56                                return;
57                            }
58                        };
59
60                        if let Some(time_left) =  rl.is_disabled(RateLimitingCategory::Any) {
61                            sentry_debug!(
62                                "Skipping event send because we're disabled due to rate limits for {}s",
63                                time_left.as_secs()
64                            );
65                            continue;
66                        }
67                        match rl.filter_envelope(envelope) {
68                            Some(envelope) => {
69                                rl = send(envelope, rl).await;
70                            },
71                            None => {
72                                sentry_debug!("Envelope was discarded due to per-item rate limits");
73                            },
74                        };
75                    }
76                })
77            })
78            .ok();
79
80        Self {
81            sender,
82            shutdown,
83            handle,
84        }
85    }
86
87    pub fn send(&self, envelope: Envelope) {
88        // Using send here would mean that when the channel fills up for whatever
89        // reason, trying to send an envelope would block everything. We'd rather
90        // drop the envelope in that case.
91        if let Err(e) = self.sender.try_send(Task::SendEnvelope(envelope)) {
92            sentry_debug!("envelope dropped: {e}");
93        }
94    }
95
96    pub fn flush(&self, timeout: Duration) -> bool {
97        let (sender, receiver) = sync_channel(1);
98        let _ = self.sender.send(Task::Flush(sender));
99        receiver.recv_timeout(timeout).is_ok()
100    }
101}
102
103impl Drop for TransportThread {
104    fn drop(&mut self) {
105        self.shutdown.store(true, Ordering::SeqCst);
106        let _ = self.sender.send(Task::Shutdown);
107        if let Some(handle) = self.handle.take() {
108            handle.join().unwrap();
109        }
110    }
111}