nix/
mqueue.rs

1//! Posix Message Queue functions
2//!
3//! # Example
4//!
5// no_run because a kernel module may be required.
6//! ```no_run
7//! # use std::ffi::CString;
8//! # use nix::mqueue::*;
9//! use nix::sys::stat::Mode;
10//!
11//! const MSG_SIZE: mq_attr_member_t = 32;
12//! let mq_name= CString::new("/a_nix_test_queue").unwrap();
13//!
14//! let oflag0 = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
15//! let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
16//! let mqd0 = mq_open(&mq_name, oflag0, mode, None).unwrap();
17//! let msg_to_send = b"msg_1";
18//! mq_send(&mqd0, msg_to_send, 1).unwrap();
19//!
20//! let oflag1 = MQ_OFlag::O_CREAT | MQ_OFlag::O_RDONLY;
21//! let mqd1 = mq_open(&mq_name, oflag1, mode, None).unwrap();
22//! let mut buf = [0u8; 32];
23//! let mut prio = 0u32;
24//! let len = mq_receive(&mqd1, &mut buf, &mut prio).unwrap();
25//! assert_eq!(prio, 1);
26//! assert_eq!(msg_to_send, &buf[0..len]);
27//!
28//! mq_close(mqd1).unwrap();
29//! mq_close(mqd0).unwrap();
30//! ```
31//! [Further reading and details on the C API](https://man7.org/linux/man-pages/man7/mq_overview.7.html)
32
33use crate::errno::Errno;
34use crate::Result;
35
36use crate::sys::stat::Mode;
37use libc::{self, c_char, mqd_t, size_t};
38use std::ffi::CStr;
39use std::mem;
40
41libc_bitflags! {
42    /// Used with [`mq_open`].
43    pub struct MQ_OFlag: libc::c_int {
44        /// Open the message queue for receiving messages.
45        O_RDONLY;
46        /// Open the queue for sending messages.
47        O_WRONLY;
48        /// Open the queue for both receiving and sending messages
49        O_RDWR;
50        /// Create a message queue.
51        O_CREAT;
52        /// If set along with `O_CREAT`, `mq_open` will fail if the message
53        /// queue name exists.
54        O_EXCL;
55        /// `mq_send` and `mq_receive` should fail with `EAGAIN` rather than
56        /// wait for resources that are not currently available.
57        O_NONBLOCK;
58        /// Set the close-on-exec flag for the message queue descriptor.
59        O_CLOEXEC;
60    }
61}
62
63/// A message-queue attribute, optionally used with [`mq_setattr`] and
64/// [`mq_getattr`] and optionally [`mq_open`],
65#[repr(C)]
66#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
67pub struct MqAttr {
68    mq_attr: libc::mq_attr,
69}
70
71/// Identifies an open POSIX Message Queue
72// A safer wrapper around libc::mqd_t, which is a pointer on some platforms
73// Deliberately is not Clone to prevent use-after-close scenarios
74#[repr(transparent)]
75#[derive(Debug)]
76#[allow(missing_copy_implementations)]
77pub struct MqdT(mqd_t);
78
79// x32 compatibility
80// See https://sourceware.org/bugzilla/show_bug.cgi?id=21279
81/// Size of a message queue attribute member
82#[cfg(all(target_arch = "x86_64", target_pointer_width = "32"))]
83#[cfg_attr(docsrs, doc(cfg(all())))]
84pub type mq_attr_member_t = i64;
85/// Size of a message queue attribute member
86#[cfg(not(all(target_arch = "x86_64", target_pointer_width = "32")))]
87#[cfg_attr(docsrs, doc(cfg(all())))]
88pub type mq_attr_member_t = libc::c_long;
89
90impl MqAttr {
91    /// Create a new message queue attribute
92    ///
93    /// # Arguments
94    ///
95    /// - `mq_flags`:   Either `0` or `O_NONBLOCK`.
96    /// - `mq_maxmsg`:  Maximum number of messages on the queue.
97    /// - `mq_msgsize`: Maximum message size in bytes.
98    /// - `mq_curmsgs`: Number of messages currently in the queue.
99    pub fn new(
100        mq_flags: mq_attr_member_t,
101        mq_maxmsg: mq_attr_member_t,
102        mq_msgsize: mq_attr_member_t,
103        mq_curmsgs: mq_attr_member_t,
104    ) -> MqAttr {
105        let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit();
106        unsafe {
107            let p = attr.as_mut_ptr();
108            (*p).mq_flags = mq_flags;
109            (*p).mq_maxmsg = mq_maxmsg;
110            (*p).mq_msgsize = mq_msgsize;
111            (*p).mq_curmsgs = mq_curmsgs;
112            MqAttr {
113                mq_attr: attr.assume_init(),
114            }
115        }
116    }
117
118    /// The current flags, either `0` or `O_NONBLOCK`.
119    pub const fn flags(&self) -> mq_attr_member_t {
120        self.mq_attr.mq_flags
121    }
122
123    /// The max number of messages that can be held by the queue
124    pub const fn maxmsg(&self) -> mq_attr_member_t {
125        self.mq_attr.mq_maxmsg
126    }
127
128    /// The maximum size of each message (in bytes)
129    pub const fn msgsize(&self) -> mq_attr_member_t {
130        self.mq_attr.mq_msgsize
131    }
132
133    /// The number of messages currently held in the queue
134    pub const fn curmsgs(&self) -> mq_attr_member_t {
135        self.mq_attr.mq_curmsgs
136    }
137}
138
139/// Open a message queue
140///
141/// See also [`mq_open(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_open.html)
142// The mode.bits cast is only lossless on some OSes
143#[allow(clippy::cast_lossless)]
144pub fn mq_open(
145    name: &CStr,
146    oflag: MQ_OFlag,
147    mode: Mode,
148    attr: Option<&MqAttr>,
149) -> Result<MqdT> {
150    let res = match attr {
151        Some(mq_attr) => unsafe {
152            libc::mq_open(
153                name.as_ptr(),
154                oflag.bits(),
155                mode.bits() as libc::c_int,
156                &mq_attr.mq_attr as *const libc::mq_attr,
157            )
158        },
159        None => unsafe { libc::mq_open(name.as_ptr(), oflag.bits()) },
160    };
161    Errno::result(res).map(MqdT)
162}
163
164/// Remove a message queue
165///
166/// See also [`mq_unlink(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_unlink.html)
167pub fn mq_unlink(name: &CStr) -> Result<()> {
168    let res = unsafe { libc::mq_unlink(name.as_ptr()) };
169    Errno::result(res).map(drop)
170}
171
172/// Close a message queue
173///
174/// See also [`mq_close(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_close.html)
175pub fn mq_close(mqdes: MqdT) -> Result<()> {
176    let res = unsafe { libc::mq_close(mqdes.0) };
177    Errno::result(res).map(drop)
178}
179
180/// Receive a message from a message queue
181///
182/// See also [`mq_receive(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_receive.html)
183pub fn mq_receive(
184    mqdes: &MqdT,
185    message: &mut [u8],
186    msg_prio: &mut u32,
187) -> Result<usize> {
188    let len = message.len() as size_t;
189    let res = unsafe {
190        libc::mq_receive(
191            mqdes.0,
192            message.as_mut_ptr() as *mut c_char,
193            len,
194            msg_prio as *mut u32,
195        )
196    };
197    Errno::result(res).map(|r| r as usize)
198}
199
200/// Send a message to a message queue
201///
202/// See also [`mq_send(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_send.html)
203pub fn mq_send(mqdes: &MqdT, message: &[u8], msq_prio: u32) -> Result<()> {
204    let res = unsafe {
205        libc::mq_send(
206            mqdes.0,
207            message.as_ptr() as *const c_char,
208            message.len(),
209            msq_prio,
210        )
211    };
212    Errno::result(res).map(drop)
213}
214
215/// Get message queue attributes
216///
217/// See also [`mq_getattr(2)`](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_getattr.html)
218pub fn mq_getattr(mqd: &MqdT) -> Result<MqAttr> {
219    let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit();
220    let res = unsafe { libc::mq_getattr(mqd.0, attr.as_mut_ptr()) };
221    Errno::result(res).map(|_| unsafe {
222        MqAttr {
223            mq_attr: attr.assume_init(),
224        }
225    })
226}
227
228/// Set the attributes of the message queue. Only `O_NONBLOCK` can be set, everything else will be ignored
229/// Returns the old attributes
230/// It is recommend to use the `mq_set_nonblock()` and `mq_remove_nonblock()` convenience functions as they are easier to use
231///
232/// [Further reading](https://pubs.opengroup.org/onlinepubs/9699919799/functions/mq_setattr.html)
233pub fn mq_setattr(mqd: &MqdT, newattr: &MqAttr) -> Result<MqAttr> {
234    let mut attr = mem::MaybeUninit::<libc::mq_attr>::uninit();
235    let res = unsafe {
236        libc::mq_setattr(
237            mqd.0,
238            &newattr.mq_attr as *const libc::mq_attr,
239            attr.as_mut_ptr(),
240        )
241    };
242    Errno::result(res).map(|_| unsafe {
243        MqAttr {
244            mq_attr: attr.assume_init(),
245        }
246    })
247}
248
249/// Convenience function.
250/// Sets the `O_NONBLOCK` attribute for a given message queue descriptor
251/// Returns the old attributes
252#[allow(clippy::useless_conversion)] // Not useless on all OSes
253pub fn mq_set_nonblock(mqd: &MqdT) -> Result<MqAttr> {
254    let oldattr = mq_getattr(mqd)?;
255    let newattr = MqAttr::new(
256        mq_attr_member_t::from(MQ_OFlag::O_NONBLOCK.bits()),
257        oldattr.mq_attr.mq_maxmsg,
258        oldattr.mq_attr.mq_msgsize,
259        oldattr.mq_attr.mq_curmsgs,
260    );
261    mq_setattr(mqd, &newattr)
262}
263
264/// Convenience function.
265/// Removes `O_NONBLOCK` attribute for a given message queue descriptor
266/// Returns the old attributes
267pub fn mq_remove_nonblock(mqd: &MqdT) -> Result<MqAttr> {
268    let oldattr = mq_getattr(mqd)?;
269    let newattr = MqAttr::new(
270        0,
271        oldattr.mq_attr.mq_maxmsg,
272        oldattr.mq_attr.mq_msgsize,
273        oldattr.mq_attr.mq_curmsgs,
274    );
275    mq_setattr(mqd, &newattr)
276}