mqrs: Implement send and recv of sysvmq [wip]

TBD: CHANGELOG.md and adapt to recent changes is sysvmq
This commit is contained in:
finga 2023-10-27 19:15:04 +02:00
parent 47d3b77f39
commit 59dff353ff
5 changed files with 159 additions and 15 deletions

View file

@ -1,5 +1,7 @@
use anyhow::Result;
use clap::{ArgAction, Parser};
use clap::Parser;
use log::Level;
use std::env;
mod posix;
mod sysv;
@ -30,6 +32,8 @@ enum SysvCommand {
Info(sysv::Info),
List(sysv::List),
Unlink(sysv::Unlink),
Send(sysv::Send),
Recv(sysv::Recv),
}
#[derive(Debug, Parser)]
@ -39,9 +43,9 @@ enum SysvCommand {
subcommand_required = true
)]
struct Opts {
/// Produce verbose output, multiple -v options increase the verbosity (max. 3)
#[clap(short, long, global = true, action = ArgAction::Count)]
verbose: u32,
/// Set a log level
#[arg(short, long, value_name = "LEVEL", default_value_t = Level::Info)]
pub log_level: Level,
/// Backend to be used
#[clap(subcommand)]
backend: Backend,
@ -50,15 +54,11 @@ struct Opts {
fn main() -> Result<()> {
let opts: Opts = Opts::parse();
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(
match opts.verbose {
0 => "warn",
1 => "info",
2 => "debug",
_ => "trace",
},
))
.init();
if env::var("RUST_LOG").is_err() {
env::set_var("RUST_LOG", format!("{}", opts.log_level));
}
env_logger::init();
match opts.backend {
Backend::Posix(p) => match p {
@ -74,6 +74,8 @@ fn main() -> Result<()> {
SysvCommand::Info(i) => i.run()?,
SysvCommand::List(l) => l.run()?,
SysvCommand::Unlink(u) => u.run()?,
SysvCommand::Send(s) => s.run()?,
SysvCommand::Recv(r) => r.run()?,
},
}

View file

@ -1,9 +1,13 @@
mod create;
mod info;
mod list;
mod recv;
mod send;
mod unlink;
pub use create::Create;
pub use info::Info;
pub use list::List;
pub use recv::Recv;
pub use send::Send;
pub use unlink::Unlink;

42
src/sysv/recv.rs Normal file
View file

@ -0,0 +1,42 @@
use anyhow::Result;
// use chrono::DateTime;
use clap::Parser;
// use humantime::Duration;
use log::info;
use sysvmq::SysvMq;
/// Send a message to a message queue
#[derive(Debug, Parser)]
pub struct Recv {
// /// Set a different priority, priority >= 0
// #[clap(short, long, default_value = "0")]
// priority: u32,
// /// Do not block
// #[clap(short, long)]
// non_blocking: bool,
// /// Timeout, example "5h 23min 42ms"
// #[clap(short = 'o', long, conflicts_with = "deadline")]
// timeout: Option<String>,
// /// Deadline until messages are sent (format: "%Y-%m-%d %H:%M:%S")
// #[clap(short, long, conflicts_with = "timeout")]
// deadline: Option<String>,
// /// Name of the queue
// #[clap(value_name = "QUEUE")]
// queue: String,
/// Key of of the queue to write to
#[clap(value_name = "Key")]
key: i32,
// /// Message to be sent to the queue
// #[clap(value_name = "MESSAGE")]
// msg: String,
}
impl Recv {
pub fn run(&self) -> Result<()> {
let mq = SysvMq::<String>::new();
mq.open(self.key)?.recv();
Ok(())
}
}

42
src/sysv/send.rs Normal file
View file

@ -0,0 +1,42 @@
use anyhow::Result;
// use chrono::DateTime;
use clap::Parser;
// use humantime::Duration;
use log::info;
use sysvmq::SysvMq;
/// Send a message to a message queue
#[derive(Debug, Parser)]
pub struct Send {
// /// Set a different priority, priority >= 0
// #[clap(short, long, default_value = "0")]
// priority: u32,
// /// Do not block
// #[clap(short, long)]
// non_blocking: bool,
// /// Timeout, example "5h 23min 42ms"
// #[clap(short = 'o', long, conflicts_with = "deadline")]
// timeout: Option<String>,
// /// Deadline until messages are sent (format: "%Y-%m-%d %H:%M:%S")
// #[clap(short, long, conflicts_with = "timeout")]
// deadline: Option<String>,
// /// Name of the queue
// #[clap(value_name = "QUEUE")]
// queue: String,
/// Key of of the queue to write to
#[clap(value_name = "Key")]
key: i32,
/// Message to be sent to the queue
#[clap(value_name = "MESSAGE")]
msg: String,
}
impl Send {
pub fn run(&self) -> Result<()> {
let mq = SysvMq::<String>::new();
mq.open(self.key)?.send(self.msg.as_bytes());
Ok(())
}
}

View file

@ -1,6 +1,24 @@
use libc::{
c_void, msgctl, msgget, msgrcv, msgsnd, msqid_ds, IPC_CREAT, IPC_INFO, IPC_NOWAIT, IPC_RMID,
MSG_INFO, MSG_STAT,
c_void,
msgctl,
msgget,
msgrcv,
msgsnd,
// msginfo,
msqid_ds,
IPC_CREAT,
// IPC_EXCL,
IPC_INFO,
IPC_NOWAIT,
// IPC_PRIVATE,
IPC_RMID,
// IPC_SET,
// IPC_STAT,
// MSG_COPY,
// MSG_EXCEPT,
MSG_INFO,
// MSG_NOERROR,
MSG_STAT,
};
use nix::errno::{errno, Errno};
use std::{mem::MaybeUninit, ptr};
@ -12,6 +30,42 @@ pub enum SysvMqError {
ErrnoError(&'static str),
}
// /// IPC bit flags
// #[repr(i32)]
// enum Flags {
// /// Create key if key does not exist.
// CreateKey = IPC_CREAT,
// // /// Fail if key exists.
// // Exclusive = IPC_EXCL,
// /// Return error on wait.
// NoWait = IPC_NOWAIT,
// // /// No error if message is too big.
// // NoError = MSG_NOERROR,
// // /// Receive any message except of specified type.
// // Except = MSG_EXCEPT,
// // /// Copy (not remove) all queue messages.
// // Copy = MSG_COPY,
// // /// Private key (Special key value).
// // Private = IPC_PRIVATE,
// }
// /// Commands for `msgctl()`
// #[repr(i32)]
// enum ControlCommands {
// /// Remove identifier (Control command for `msgctl`, `semctl`, and `shmctl`).
// Remove = IPC_RMID,
// // /// Set `ipc_perm` options (Control command for `msgctl`, `semctl`, and `shmctl`).
// // SetPerm = IPC_SET,
// // /// Get `ipc_perm` options (Control command for `msgctl`, `semctl`, and `shmctl`).
// // GetPerm = IPC_STAT,
// /// See ipcs (Control command for `msgctl`, `semctl`, and `shmctl`).
// IpcInfo = IPC_INFO,
// /// IPCS control command.
// Stat = MSG_STAT,
// /// IPCS control command.
// MsgInfo = MSG_INFO,
// }
fn msgctl_wrapper(msqid: i32, cmd: i32, buf: *mut msqid_ds) -> Result<i32, SysvMqError> {
match unsafe { msgctl(msqid, cmd, buf) } {
-1 => Err(SysvMqError::ErrnoError(Errno::from_i32(errno()).desc())),