1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License in the LICENSE file at the
// root of this repository, or online at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Threading and synchronization utilities.
use std::sync::Mutex;
/// A synchronized resource lottery.
///
/// Controls access to a non-[`Sync`] resource by allowing only one thread to
/// win the resource "lottery." A dummy resource is constructed for threads
/// that lose the lottery.
///
/// # Examples
///
/// ```rust
/// # use std::io;
/// # use std::io::Write;
/// # use std::thread;
/// # use ore::sync::Lottery;
///
/// struct Discarder;
///
/// impl io::Write for Discarder {
/// fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
/// Ok(buf.len())
/// }
/// fn flush(&mut self) -> io::Result<()> {
/// Ok(())
/// }
/// }
///
/// let stderr: Box<dyn io::Write + Send> = Box::new(io::stderr());
/// let lottery = Lottery::new(stderr, || Box::new(Discarder));
/// crossbeam_utils::thread::scope(|thread_scope| {
/// (0..5)
/// .into_iter()
/// .map(|_| {
/// thread_scope.spawn(|_| {
/// write!(lottery.draw(), "Can you hear me?");
/// })
/// })
/// .for_each(|handle| handle.join().unwrap());
/// })
/// .unwrap()
/// ```
#[derive(Debug)]
pub struct Lottery<T, F>
where
F: Fn() -> T,
{
winner: Mutex<Option<T>>,
losers: F,
}
impl<T, F> Lottery<T, F>
where
F: Fn() -> T,
{
/// Creates a new `Lottery` from the specified winner object and a function
/// to construct loser objects.
pub fn new(winner: T, losers: F) -> Lottery<T, F> {
Lottery {
winner: Mutex::new(Some(winner)),
losers,
}
}
/// Attempts to win the lottery. It returns the winner resource if this is
/// the first thread to call `draw`. If another thread has already claimed
/// the winner resource, it instead constructs and returns a loser resource.
pub fn draw(&self) -> T {
let mut guard = self.winner.lock().unwrap();
match guard.take() {
Some(t) => t,
None => (self.losers)(),
}
}
}
#[cfg(test)]
mod tests {
use crossbeam_utils::thread;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use super::Lottery;
#[test]
fn test_lottery() {
let lottery = Lottery::new(true, || false);
let loser_count = Arc::new(AtomicUsize::new(0));
let winner_count = Arc::new(AtomicUsize::new(0));
thread::scope(|scope| {
for _ in 0..5 {
scope.spawn(|_| {
match lottery.draw() {
true => winner_count.fetch_add(1, Ordering::SeqCst),
false => loser_count.fetch_add(1, Ordering::SeqCst),
};
});
}
})
.unwrap();
assert_eq!(winner_count.load(Ordering::SeqCst), 1);
assert_eq!(loser_count.load(Ordering::SeqCst), 4);
}
}