sysvmq: Implement send and recv and refactor

Implement sending to and receiving from SysV IPC message queues. While
at it refactor the whole library. Write documentation for the sysvmq
library.
This commit is contained in:
finga 2023-10-27 19:08:26 +02:00
parent 2c4d04b374
commit 79a2ad1d88
3 changed files with 413 additions and 114 deletions

View file

@ -8,8 +8,37 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased] ## [Unreleased]
### Added
- Basic tests to create, send, receive and delete a message queue as
well as changing the mode of a queue.
- The functions `SysvMq::stat()`, `SysvMq::info()`,
`SysvMq::msg_stat()` and `SysvMq::msg_info()` are added to receive
parameters of a queue.
- The function `new()` passes now a key which is used for the new
message queue.
- A basic test to test the happy path of a queue.
- The function `get()` to gather facts from a message queue.
- The function `set()` to set facts of a message queue.
- The function `create()` to create a message queue.
- The functions `send()` and `SysvMq::send()` to send messages to a
queue.
- The functions `recv()` and `SysvMq::recv()` to receive messages from
a queue.
- The function `SysvMq::delete()` to delete a queue.
### Changed ### Changed
- The function `SysvMq::mode()` now also updates the message queue
accordingly.
- Remove `PhantomData` from the `SysvMq` struct.
- The function `SysvMq::create()` is replaced by `SysvMq::new()`.
- Remove generic type for `SysvMq`.
- Rename `unlink_id()` to `delete()`.
- Update to the latest verscion of `nix`. - Update to the latest verscion of `nix`.
- Fix several clippy findings in preperation to enable several lint - Fix several clippy findings in preperation to enable several lint
groups. groups.
### Removed
- The function `id_from_key()` was removed.

View file

@ -1,24 +1,216 @@
#![allow(clippy::doc_markdown)]
//! This library provides a convenient API to SysV IPC message queues.
//!
//! # Example
//!
//! ```rust
//! use sysvmq::{SysvMq, SysvMqError};
//!
//! fn example() -> Result<(), SysvMqError> {
//! let mut mq = SysvMq::new(0)?;
//! let mut buf = [0u8; 11];
//!
//! mq.send(b"hello queue")?;
//! mq.recv(&mut buf)?;
//! mq.delete()?;
//!
//! Ok(())
//! }
//! ```
use libc::{ use libc::{
msgctl, msgget, msqid_ds, IPC_CREAT, IPC_INFO, IPC_NOWAIT, IPC_RMID, MSG_INFO,g MSG_STAT, c_void, msgctl, msgget, msgrcv, msgsnd, msqid_ds, IPC_CREAT, IPC_NOWAIT, IPC_RMID, IPC_SET,
}; };
pub use libc::{IPC_INFO, IPC_STAT, MSG_INFO, MSG_STAT};
use nix::errno::{errno, Errno}; use nix::errno::{errno, Errno};
use std::{marker::PhantomData, mem::MaybeUninit, ptr}; use std::{convert::TryFrom, mem::MaybeUninit, ptr};
use thiserror::Error; use thiserror::Error;
#[derive(Debug, Error)] #[derive(Debug, Error)]
/// An enum containing all errors
pub enum SysvMqError { pub enum SysvMqError {
#[error("SysV message queue: {0}")] #[error("SysV message queue: {0}")]
ErrnoError(&'static str), ErrnoError(&'static str),
#[error("Cannot convert integer")]
From(#[from] std::num::TryFromIntError),
} }
#[allow(clippy::doc_markdown)] /// Low level function to create a new SysV IPC message queue.
/// Unlink (delete) an existing SysV IPC message queue. ///
/// # Example
///
/// ```rust
/// # use sysvmq::delete;
/// use sysvmq::{create, SysvMqError};
///
/// fn main() -> Result<(), SysvMqError> {
/// let mq_id = create(0, 0o644)?;
/// println!("new queue: {mq_id}");
/// # delete(mq_id)?;
///
/// Ok(())
/// }
/// ```
/// ///
/// # Errors /// # Errors
/// ///
/// Return an `SysvMqError` when no queue with the given key can be /// Return `SysvMqError` when the queue cannot be created.
pub fn create(key: i32, mode: i32) -> Result<i32, SysvMqError> {
let mq = unsafe { msgget(key, IPC_CREAT | mode) };
match mq {
-1 => Err(SysvMqError::ErrnoError(Errno::from_i32(errno()).desc())),
id => Ok(id),
}
}
/// Low level function to send a message to a SysV IPC message queue.
///
/// # Example
///
/// ```rust
/// # use sysvmq::{delete};
/// use sysvmq::{create, send, SysvMqError};
///
/// fn main() -> Result<(), SysvMqError> {
/// let mq_id = create(0, 0o644)?;
/// let msg = b"hello queue";
/// send(mq_id, msg, 0)?;
/// # delete(mq_id)?;
///
/// Ok(())
/// }
/// ```
///
/// # Errors
///
/// Return `SysvMqError` when the message cannot be sent to the queue.
pub fn send(id: i32, msg: &[u8], mask: i32) -> Result<(), SysvMqError> {
match unsafe { msgsnd(id, msg.as_ptr().cast::<c_void>(), msg.len(), mask) } {
-1 => Err(SysvMqError::ErrnoError(Errno::from_i32(errno()).desc())),
_ => Ok(()),
}
}
/// Low level function to receive a message from a SysV IPC message
/// queue.
///
/// # Example
///
/// ```rust
/// # use sysvmq::{delete, send};
/// use sysvmq::{create, recv, SysvMqError};
///
/// fn main() -> Result<(), SysvMqError> {
/// let mq_mask = 0o644;
/// let mq_id = create(0, mq_mask)?;
/// let mut buf = [0u8; 32];
/// # let msg = b"hello queue";
/// # send(mq_id, msg, 0)?;
/// recv(mq_id, &mut buf, mq_mask)?;
/// println!("received message: {:?}", buf);
/// # delete(mq_id)?;
///
/// Ok(())
/// }
/// ```
///
/// # Errors
///
/// Return `SysvMqError` when the message cannot be received from the
/// queue.
pub fn recv(id: i32, msg: &mut [u8], mask: i32) -> Result<(), SysvMqError> {
match unsafe { msgrcv(id, msg.as_mut_ptr().cast::<c_void>(), msg.len(), 0, mask) } {
-1 => Err(SysvMqError::ErrnoError(Errno::from_i32(errno()).desc())),
_ => Ok(()),
}
}
/// Low level function to get parameters of a SysV IPC message queue.
///
/// # Example
///
/// ```rust
/// # use sysvmq::{delete};
/// use sysvmq::{create, get, IPC_STAT, SysvMqError};
///
/// fn main() -> Result<(), SysvMqError> {
/// let mq_id = create(0, 0o644)?;
/// let mq_stat = get(mq_id, IPC_STAT)?;
/// println!("received message: {:?}", mq_stat);
/// # delete(mq_id)?;
///
/// Ok(())
/// }
/// ```
///
/// # Errors
///
/// Return `SysvMqError` when the parameters cannot be gathered.
pub fn get(id: i32, cmd: i32) -> Result<msqid_ds, SysvMqError> {
let mut ipc_info = MaybeUninit::<msqid_ds>::uninit();
let ret;
let ipc_info = unsafe {
ret = msgctl(id, cmd, ipc_info.as_mut_ptr());
ipc_info.assume_init()
};
match ret {
0 => Ok(ipc_info),
_ => Err(SysvMqError::ErrnoError(Errno::from_i32(errno()).desc())),
}
}
/// Low level function to set parameters of a SysV IPC message queue.
///
/// # Example
///
/// ```rust
/// # use sysvmq::{delete};
/// use sysvmq::{create, get, IPC_STAT, set, SysvMqError};
///
/// fn main() -> Result<(), SysvMqError> {
/// let mq_id = create(0, 0o644)?;
/// let mut mq_stat = get(mq_id, IPC_STAT)?;
/// mq_stat.msg_perm.mode = 0o666;
/// set(mq_id, &mut mq_stat)?;
/// # delete(mq_id)?;
///
/// Ok(())
/// }
/// ```
///
/// # Errors
///
/// Return `SysvMqError` when the parameters cannot be set.
pub fn set(id: i32, data: &mut msqid_ds) -> Result<(), SysvMqError> {
match unsafe { msgctl(id, IPC_SET, data) } {
0 => Ok(()),
_ => Err(SysvMqError::ErrnoError(Errno::from_i32(errno()).desc())),
}
}
/// Low level function to delete a SysV IPC message queue.
///
/// # Example
///
/// ```rust
/// use sysvmq::{create, delete, SysvMqError};
///
/// fn main() -> Result<(), SysvMqError> {
/// let mq_id = create(0, 0o644)?;
/// delete(mq_id)?;
///
/// Ok(())
/// }
/// ```
///
/// # Errors
///
/// Return `SysvMqError` when no queue with the given key can be
/// found. /// found.
pub fn unlink_id(id: i32) -> Result<(), SysvMqError> { pub fn delete(id: i32) -> Result<(), SysvMqError> {
let res = unsafe { msgctl(id, IPC_RMID, ptr::null::<msqid_ds>().cast_mut()) }; let res = unsafe { msgctl(id, IPC_RMID, ptr::null::<msqid_ds>().cast_mut()) };
match res { match res {
@ -27,127 +219,137 @@ pub fn unlink_id(id: i32) -> Result<(), SysvMqError> {
} }
} }
#[allow(clippy::doc_markdown)] #[derive(Clone, Debug)]
/// Get the id of an existing SysV IPC message queue by passing its /// Struct representation of a Message Queue
pub struct SysvMq {
pub key: i32,
pub id: i32,
pub mask: i32,
pub mode: i32,
}
impl Default for SysvMq {
fn default() -> Self {
Self {
key: 0,
id: -1,
mask: 0,
mode: 0o644,
}
}
}
impl SysvMq {
/// Create a new message SysV IPC message queue with the given
/// key. /// key.
/// ///
/// # Errors /// # Errors
/// ///
/// Return an `SysvMqError` when no queue with the given key can be /// Return `SysvMqError` when the queue cannot be created.
/// found. pub fn new(key: i32) -> Result<Self, SysvMqError> {
pub fn id_from_key(key: i32) -> Result<i32, SysvMqError> { let mut mq = Self::default();
let id = unsafe { msgget(key, 0) }; mq.key = key;
mq.id = create(mq.key, mq.mode)?;
match id { Ok(mq)
-1 => Err(SysvMqError::ErrnoError(Errno::from_i32(errno()).desc())),
id => Ok(id),
}
} }
#[allow(clippy::doc_markdown)] /// Open an existing SysV IPC message queye with the given key.
/// Get the info about an existing SysV IPC message queue.
pub fn ipc_info(id: i32) {
let mut ipc_info = MaybeUninit::<msqid_ds>::uninit();
let ipc_info = unsafe {
msgctl(id, IPC_INFO, ipc_info.as_mut_ptr());
ipc_info.assume_init()
};
println!("info: {ipc_info:?}");
}
#[allow(clippy::doc_markdown)]
/// Get the stats about an existing SysV IPC message queue.
pub fn stat_info(id: i32) {
let mut stat_info = MaybeUninit::<msqid_ds>::uninit();
let stat_info = unsafe {
msgctl(id, MSG_STAT, stat_info.as_mut_ptr());
stat_info.assume_init()
};
println!("info: {stat_info:?}");
}
#[allow(clippy::doc_markdown)]
/// Get the message info about an existing SysV IPC message queue.
pub fn msg_info(id: i32) {
let mut msg_info = MaybeUninit::<msqid_ds>::uninit();
let msg_info = unsafe {
msgctl(id, MSG_INFO, msg_info.as_mut_ptr());
msg_info.assume_init()
};
println!("info: {msg_info:?}");
}
pub struct SysvMq<T> {
pub id: i32,
pub key: i32,
message_mask: i32,
mode: i32,
types: PhantomData<T>,
}
impl<T> SysvMq<T> {
/// Create a new message queye with the given key.
/// ///
/// # Errors /// # Errors
/// ///
/// Return an `SysvMqError` when no queue with the given key can be /// Return `SysvMqError` when the queue cannot be opened.
/// created. pub fn open(key: i32, id: i32) -> Result<Self, SysvMqError> {
pub fn create(&mut self, key: i32) -> Result<&Self, SysvMqError> { let mut mq = Self::default();
self.key = key; mq.key = key;
self.id = unsafe { msgget(self.key, IPC_CREAT | self.mode) }; mq.id = id;
mq.mode = i32::from(get(mq.id, IPC_STAT)?.msg_perm.mode);
match self.id { Ok(mq)
-1 => Err(SysvMqError::ErrnoError(Errno::from_i32(errno()).desc())),
_ => Ok(self),
}
} }
/// Open an existing message queye with the given key. /// Set the mode of a SysV IPC message queue.
/// ///
/// # Errors /// # Errors
/// ///
/// Return an `SysvMqError` when no queue with the given key can be /// Return `SysvMqError` when the mode of the queue cannot be set.
/// found. pub fn mode(&mut self, mode: i32) -> Result<(), SysvMqError> {
pub fn open(mut self, key: i32) -> Result<Self, SysvMqError> {
self.key = key;
self.id = unsafe { msgget(self.key, self.mode) };
match self.id {
-1 => Err(SysvMqError::ErrnoError(Errno::from_i32(errno()).desc())),
_ => Ok(self),
}
}
pub fn mode(&mut self, mode: i32) -> &Self {
self.mode = mode; self.mode = mode;
self let mut stats = get(self.id, IPC_STAT)?;
stats.msg_perm.mode = u16::try_from(self.mode)?;
set(self.id, &mut stats)
} }
pub fn nonblocking(&mut self) -> &Self { /// Set the mask of a SysV IPC message queue to nonblocking
self.message_mask |= IPC_NOWAIT; /// (`IPC_NOWAIT`).
self pub fn nonblocking(&mut self) {
self.mask |= IPC_NOWAIT;
} }
#[must_use] /// Send a message to a SysV IPC message queue.
pub const fn new() -> Self { ///
Self { /// # Errors
id: -1, ///
key: 0, /// Return `SysvMqError` when the message cannot be sent to the queue.
message_mask: 0, pub fn send(&mut self, msg: &[u8]) -> Result<(), SysvMqError> {
mode: 0o644, send(self.id, msg, self.mask)
types: PhantomData,
}
}
} }
impl<T> Default for SysvMq<T> { /// Receive a message from a SysV IPC message queue.
fn default() -> Self { ///
Self::new() /// # Errors
///
/// Return `SysvMqError` when the message cannot be received from the
/// queue.
pub fn recv(&mut self, msg: &mut [u8]) -> Result<(), SysvMqError> {
recv(self.id, msg, self.mask)
}
/// Receive stats from a SysV IPC message queue.
///
/// # Errors
///
/// Return `SysvMqError` when the stats cannot be gathered from
/// the queue.
pub fn stat(&self) -> Result<msqid_ds, SysvMqError> {
get(self.id, IPC_STAT)
}
/// Receive info from a SysV IPC message queue.
///
/// # Errors
///
/// Return `SysvMqError` when the info cannot be gathered from the
/// queue.
pub fn info(&self) -> Result<msqid_ds, SysvMqError> {
get(self.id, IPC_INFO)
}
/// Receive message stats from a SysV IPC message queue.
///
/// # Errors
///
/// Return `SysvMqError` when the message stats cannot be gathered
/// from the queue.
pub fn msg_stat(&self) -> Result<msqid_ds, SysvMqError> {
get(self.id, MSG_STAT)
}
/// Receive message info from a SysV IPC message queue.
///
/// # Errors
///
/// Return `SysvMqError` when the message info cannot be gathered
/// from the queue.
pub fn msg_info(&self) -> Result<msqid_ds, SysvMqError> {
get(self.id, MSG_INFO)
}
/// Delete an existing SysV IPC message queue.
///
/// # Errors
///
/// Return an `SysvMqError` when no queue with the given key can be
/// found.
pub fn delete(&mut self) -> Result<(), SysvMqError> {
delete(self.id)
} }
} }

68
sysvmq/tests/main.rs Normal file
View file

@ -0,0 +1,68 @@
use sysvmq::SysvMq;
#[test]
fn new_send_recv_delete() {
let mut mq = SysvMq::new(0).expect("could not create SysV message queue with key 0");
let msg = b"this is a test";
let mut buf = [0u8; 14];
mq.send(msg)
.expect("could not send message to SysV message queue");
mq.recv(&mut buf)
.expect("could not receive message from SysV message queue");
mq.delete().expect("could not destroy SysV message queue");
assert_eq!(msg, &buf);
}
#[test]
fn separate_send_recv_clone() {
let mut sender = SysvMq::new(0).expect("could not create SysV message queue with key 0");
let mut receiver = sender.clone();
let msg = b"this is another test";
let mut buf = [0u8; 20];
sender
.send(msg)
.expect("could not send message to SysV message queue");
receiver
.recv(&mut buf)
.expect("could not receive message from SysV message queue");
sender
.delete()
.expect("could not destroy SysV message queue");
assert_eq!(msg, &buf);
}
#[test]
fn separate_send_recv_open() {
let mut sender = SysvMq::new(0).expect("could not create SysV message queue with key 0");
let mut receiver = SysvMq::open(sender.key, sender.id)
.expect("could not create second handle to SysV message queue");
let msg = b"this is another test";
let mut buf = [0u8; 20];
sender
.send(msg)
.expect("could not send message to SysV message queue");
receiver
.recv(&mut buf)
.expect("could not receive message from SysV message queue");
sender
.delete()
.expect("could not destroy SysV message queue");
assert_eq!(msg, &buf);
}
#[test]
fn change_mode() {
let mut mq = SysvMq::new(0).expect("could not create SysV message queue with key 0");
let init_stats = mq
.stat()
.expect("could not get stats from SysV message queue");
mq.mode(0o666)
.expect("could not set mode of SysV message queue");
let new_stats = mq
.stat()
.expect("could not get stats from SysV message queue");
mq.delete().expect("could not destroy SysV message queue");
assert_eq!(0o644, init_stats.msg_perm.mode);
assert_eq!(0o666, new_stats.msg_perm.mode);
}