sentry/transports/
tokio_thread.rs
1use std::sync::atomic::{AtomicBool, Ordering};
2use std::sync::mpsc::{sync_channel, SyncSender};
3use std::sync::Arc;
4use std::thread::{self, JoinHandle};
5use std::time::Duration;
6
7use super::ratelimit::{RateLimiter, RateLimitingCategory};
8use crate::{sentry_debug, Envelope};
9
10enum Task {
11 SendEnvelope(Envelope),
12 Flush(SyncSender<()>),
13 Shutdown,
14}
15
16pub struct TransportThread {
17 sender: SyncSender<Task>,
18 shutdown: Arc<AtomicBool>,
19 handle: Option<JoinHandle<()>>,
20}
21
22impl TransportThread {
23 pub fn new<SendFn, SendFuture>(mut send: SendFn) -> Self
24 where
25 SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static,
26 SendFuture: std::future::Future<Output = RateLimiter>,
28 {
29 let (sender, receiver) = sync_channel(30);
30 let shutdown = Arc::new(AtomicBool::new(false));
31 let shutdown_worker = shutdown.clone();
32 let handle = thread::Builder::new()
33 .name("sentry-transport".into())
34 .spawn(move || {
35 let rt = tokio::runtime::Builder::new_current_thread()
37 .enable_all()
38 .build()
39 .unwrap();
40
41 let mut rl = RateLimiter::new();
42
43 rt.block_on(async move {
45 for task in receiver.into_iter() {
46 if shutdown_worker.load(Ordering::SeqCst) {
47 return;
48 }
49 let envelope = match task {
50 Task::SendEnvelope(envelope) => envelope,
51 Task::Flush(sender) => {
52 sender.send(()).ok();
53 continue;
54 }
55 Task::Shutdown => {
56 return;
57 }
58 };
59
60 if let Some(time_left) = rl.is_disabled(RateLimitingCategory::Any) {
61 sentry_debug!(
62 "Skipping event send because we're disabled due to rate limits for {}s",
63 time_left.as_secs()
64 );
65 continue;
66 }
67 match rl.filter_envelope(envelope) {
68 Some(envelope) => {
69 rl = send(envelope, rl).await;
70 },
71 None => {
72 sentry_debug!("Envelope was discarded due to per-item rate limits");
73 },
74 };
75 }
76 })
77 })
78 .ok();
79
80 Self {
81 sender,
82 shutdown,
83 handle,
84 }
85 }
86
87 pub fn send(&self, envelope: Envelope) {
88 if let Err(e) = self.sender.try_send(Task::SendEnvelope(envelope)) {
92 sentry_debug!("envelope dropped: {e}");
93 }
94 }
95
96 pub fn flush(&self, timeout: Duration) -> bool {
97 let (sender, receiver) = sync_channel(1);
98 let _ = self.sender.send(Task::Flush(sender));
99 receiver.recv_timeout(timeout).is_ok()
100 }
101}
102
103impl Drop for TransportThread {
104 fn drop(&mut self) {
105 self.shutdown.store(true, Ordering::SeqCst);
106 let _ = self.sender.send(Task::Shutdown);
107 if let Some(handle) = self.handle.take() {
108 handle.join().unwrap();
109 }
110 }
111}