rayon_core/
log.rs

1//! Debug Logging
2//!
3//! To use in a debug build, set the env var `RAYON_LOG` as
4//! described below.  In a release build, logs are compiled out by
5//! default unless Rayon is built with `--cfg rayon_rs_log` (try
6//! `RUSTFLAGS="--cfg rayon_rs_log"`).
7//!
8//! Note that logs are an internally debugging tool and their format
9//! is considered unstable, as are the details of how to enable them.
10//!
11//! # Valid values for RAYON_LOG
12//!
13//! The `RAYON_LOG` variable can take on the following values:
14//!
15//! * `tail:<file>` -- dumps the last 10,000 events into the given file;
16//!   useful for tracking down deadlocks
17//! * `profile:<file>` -- dumps only those events needed to reconstruct how
18//!   many workers are active at a given time
19//! * `all:<file>` -- dumps every event to the file; useful for debugging
20
21use crossbeam_channel::{self, Receiver, Sender};
22use std::collections::VecDeque;
23use std::env;
24use std::fs::File;
25use std::io::{self, BufWriter, Write};
26
27/// True if logs are compiled in.
28pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions));
29
30#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
31pub(super) enum Event {
32    /// Flushes events to disk, used to terminate benchmarking.
33    Flush,
34
35    /// Indicates that a worker thread started execution.
36    ThreadStart {
37        worker: usize,
38        terminate_addr: usize,
39    },
40
41    /// Indicates that a worker thread started execution.
42    ThreadTerminate { worker: usize },
43
44    /// Indicates that a worker thread became idle, blocked on `latch_addr`.
45    ThreadIdle { worker: usize, latch_addr: usize },
46
47    /// Indicates that an idle worker thread found work to do, after
48    /// yield rounds. It should no longer be considered idle.
49    ThreadFoundWork { worker: usize, yields: u32 },
50
51    /// Indicates that a worker blocked on a latch observed that it was set.
52    ///
53    /// Internal debugging event that does not affect the state
54    /// machine.
55    ThreadSawLatchSet { worker: usize, latch_addr: usize },
56
57    /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal
58    /// sleep state that we saw at the time.
59    ThreadSleepy { worker: usize, jobs_counter: usize },
60
61    /// Indicates that the thread's attempt to fall asleep was
62    /// interrupted because the latch was set. (This is not, in and of
63    /// itself, a change to the thread state.)
64    ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize },
65
66    /// Indicates that the thread's attempt to fall asleep was
67    /// interrupted because a job was posted. (This is not, in and of
68    /// itself, a change to the thread state.)
69    ThreadSleepInterruptedByJob { worker: usize },
70
71    /// Indicates that an idle worker has gone to sleep.
72    ThreadSleeping { worker: usize, latch_addr: usize },
73
74    /// Indicates that a sleeping worker has awoken.
75    ThreadAwoken { worker: usize, latch_addr: usize },
76
77    /// Indicates that the given worker thread was notified it should
78    /// awaken.
79    ThreadNotify { worker: usize },
80
81    /// The given worker has pushed a job to its local deque.
82    JobPushed { worker: usize },
83
84    /// The given worker has popped a job from its local deque.
85    JobPopped { worker: usize },
86
87    /// The given worker has stolen a job from the deque of another.
88    JobStolen { worker: usize, victim: usize },
89
90    /// N jobs were injected into the global queue.
91    JobsInjected { count: usize },
92
93    /// A job was removed from the global queue.
94    JobUninjected { worker: usize },
95
96    /// When announcing a job, this was the value of the counters we observed.
97    ///
98    /// No effect on thread state, just a debugging event.
99    JobThreadCounts {
100        worker: usize,
101        num_idle: u16,
102        num_sleepers: u16,
103    },
104}
105
106/// Handle to the logging thread, if any. You can use this to deliver
107/// logs. You can also clone it freely.
108#[derive(Clone)]
109pub(super) struct Logger {
110    sender: Option<Sender<Event>>,
111}
112
113impl Logger {
114    pub(super) fn new(num_workers: usize) -> Logger {
115        if !LOG_ENABLED {
116            return Self::disabled();
117        }
118
119        // see the doc comment for the format
120        let env_log = match env::var("RAYON_LOG") {
121            Ok(s) => s,
122            Err(_) => return Self::disabled(),
123        };
124
125        let (sender, receiver) = crossbeam_channel::unbounded();
126
127        if env_log.starts_with("tail:") {
128            let filename = env_log["tail:".len()..].to_string();
129            ::std::thread::spawn(move || {
130                Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
131            });
132        } else if env_log == "all" {
133            ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
134        } else if env_log.starts_with("profile:") {
135            let filename = env_log["profile:".len()..].to_string();
136            ::std::thread::spawn(move || {
137                Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
138            });
139        } else {
140            panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
141        }
142
143        return Logger {
144            sender: Some(sender),
145        };
146    }
147
148    fn disabled() -> Logger {
149        Logger { sender: None }
150    }
151
152    #[inline]
153    pub(super) fn log(&self, event: impl FnOnce() -> Event) {
154        if !LOG_ENABLED {
155            return;
156        }
157
158        if let Some(sender) = &self.sender {
159            sender.send(event()).unwrap();
160        }
161    }
162
163    fn profile_logger_thread(
164        num_workers: usize,
165        log_filename: String,
166        capacity: usize,
167        receiver: Receiver<Event>,
168    ) {
169        let file = File::create(&log_filename)
170            .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
171
172        let mut writer = BufWriter::new(file);
173        let mut events = Vec::with_capacity(capacity);
174        let mut state = SimulatorState::new(num_workers);
175        let timeout = std::time::Duration::from_secs(30);
176
177        loop {
178            loop {
179                match receiver.recv_timeout(timeout) {
180                    Ok(event) => {
181                        if let Event::Flush = event {
182                            break;
183                        } else {
184                            events.push(event);
185                        }
186                    }
187
188                    Err(_) => break,
189                }
190
191                if events.len() == capacity {
192                    break;
193                }
194            }
195
196            for event in events.drain(..) {
197                if state.simulate(&event) {
198                    state.dump(&mut writer, &event).unwrap();
199                }
200            }
201
202            writer.flush().unwrap();
203        }
204    }
205
206    fn tail_logger_thread(
207        num_workers: usize,
208        log_filename: String,
209        capacity: usize,
210        receiver: Receiver<Event>,
211    ) {
212        let file = File::create(&log_filename)
213            .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
214
215        let mut writer = BufWriter::new(file);
216        let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity);
217        let mut state = SimulatorState::new(num_workers);
218        let timeout = std::time::Duration::from_secs(30);
219        let mut skipped = false;
220
221        loop {
222            loop {
223                match receiver.recv_timeout(timeout) {
224                    Ok(event) => {
225                        if let Event::Flush = event {
226                            // We ignore Flush events in tail mode --
227                            // we're really just looking for
228                            // deadlocks.
229                            continue;
230                        } else {
231                            if events.len() == capacity {
232                                let event = events.pop_front().unwrap();
233                                state.simulate(&event);
234                                skipped = true;
235                            }
236
237                            events.push_back(event);
238                        }
239                    }
240
241                    Err(_) => break,
242                }
243            }
244
245            if skipped {
246                write!(writer, "...\n").unwrap();
247                skipped = false;
248            }
249
250            for event in events.drain(..) {
251                // In tail mode, we dump *all* events out, whether or
252                // not they were 'interesting' to the state machine.
253                state.simulate(&event);
254                state.dump(&mut writer, &event).unwrap();
255            }
256
257            writer.flush().unwrap();
258        }
259    }
260
261    fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) {
262        let stderr = std::io::stderr();
263        let mut state = SimulatorState::new(num_workers);
264
265        for event in receiver {
266            let mut writer = BufWriter::new(stderr.lock());
267            state.simulate(&event);
268            state.dump(&mut writer, &event).unwrap();
269            writer.flush().unwrap();
270        }
271    }
272}
273
274#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
275enum State {
276    Working,
277    Idle,
278    Notified,
279    Sleeping,
280    Terminated,
281}
282
283impl State {
284    fn letter(&self) -> char {
285        match self {
286            State::Working => 'W',
287            State::Idle => 'I',
288            State::Notified => 'N',
289            State::Sleeping => 'S',
290            State::Terminated => 'T',
291        }
292    }
293}
294
295struct SimulatorState {
296    local_queue_size: Vec<usize>,
297    thread_states: Vec<State>,
298    injector_size: usize,
299}
300
301impl SimulatorState {
302    fn new(num_workers: usize) -> Self {
303        Self {
304            local_queue_size: (0..num_workers).map(|_| 0).collect(),
305            thread_states: (0..num_workers).map(|_| State::Working).collect(),
306            injector_size: 0,
307        }
308    }
309
310    fn simulate(&mut self, event: &Event) -> bool {
311        match *event {
312            Event::ThreadIdle { worker, .. } => {
313                assert_eq!(self.thread_states[worker], State::Working);
314                self.thread_states[worker] = State::Idle;
315                true
316            }
317
318            Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => {
319                self.thread_states[worker] = State::Working;
320                true
321            }
322
323            Event::ThreadTerminate { worker, .. } => {
324                self.thread_states[worker] = State::Terminated;
325                true
326            }
327
328            Event::ThreadSleeping { worker, .. } => {
329                assert_eq!(self.thread_states[worker], State::Idle);
330                self.thread_states[worker] = State::Sleeping;
331                true
332            }
333
334            Event::ThreadAwoken { worker, .. } => {
335                assert_eq!(self.thread_states[worker], State::Notified);
336                self.thread_states[worker] = State::Idle;
337                true
338            }
339
340            Event::JobPushed { worker } => {
341                self.local_queue_size[worker] += 1;
342                true
343            }
344
345            Event::JobPopped { worker } => {
346                self.local_queue_size[worker] -= 1;
347                true
348            }
349
350            Event::JobStolen { victim, .. } => {
351                self.local_queue_size[victim] -= 1;
352                true
353            }
354
355            Event::JobsInjected { count } => {
356                self.injector_size += count;
357                true
358            }
359
360            Event::JobUninjected { .. } => {
361                self.injector_size -= 1;
362                true
363            }
364
365            Event::ThreadNotify { worker } => {
366                // Currently, this log event occurs while holding the
367                // thread lock, so we should *always* see it before
368                // the worker awakens.
369                assert_eq!(self.thread_states[worker], State::Sleeping);
370                self.thread_states[worker] = State::Notified;
371                true
372            }
373
374            // remaining events are no-ops from pov of simulating the
375            // thread state
376            _ => false,
377        }
378    }
379
380    fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> {
381        let num_idle_threads = self
382            .thread_states
383            .iter()
384            .filter(|s| **s == State::Idle)
385            .count();
386
387        let num_sleeping_threads = self
388            .thread_states
389            .iter()
390            .filter(|s| **s == State::Sleeping)
391            .count();
392
393        let num_notified_threads = self
394            .thread_states
395            .iter()
396            .filter(|s| **s == State::Notified)
397            .count();
398
399        let num_pending_jobs: usize = self.local_queue_size.iter().sum();
400
401        write!(w, "{:2},", num_idle_threads)?;
402        write!(w, "{:2},", num_sleeping_threads)?;
403        write!(w, "{:2},", num_notified_threads)?;
404        write!(w, "{:4},", num_pending_jobs)?;
405        write!(w, "{:4},", self.injector_size)?;
406
407        let event_str = format!("{:?}", event);
408        write!(w, r#""{:60}","#, event_str)?;
409
410        for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) {
411            write!(w, " T{:02},{}", i, state.letter(),)?;
412
413            if *queue_size > 0 {
414                write!(w, ",{:03},", queue_size)?;
415            } else {
416                write!(w, ",   ,")?;
417            }
418        }
419
420        write!(w, "\n")?;
421        Ok(())
422    }
423}