Remove pub declarations and move to posix::*

This is in order to prepare also for SysV message queues.
This commit is contained in:
finga 2021-06-30 22:46:33 +02:00
parent 606b4de524
commit c2ad74ff15
7 changed files with 24 additions and 18 deletions

63
src/posix/create.rs Normal file
View file

@ -0,0 +1,63 @@
use anyhow::Result;
use clap::Clap;
use posixmq::PosixMq;
use std::fs;
/// Create a POSIX message queue
#[derive(Clap, Debug)]
pub struct Create {
/// Permissions (octal) to create the queue with
#[clap(short, long)]
permissions: Option<String>,
/// Maximum number of messages in the queue
#[clap(short, long)]
capacity: Option<usize>,
/// Message size in bytes
#[clap(short = 's', long)]
msgsize: Option<usize>,
/// Name of the new queue
#[clap(value_name = "QUEUE")]
queue: String,
}
fn msgsize_default() -> usize {
match fs::read_to_string("/proc/sys/fs/mqueue/msgsize_default") {
Ok(m) => m.trim().parse::<usize>().expect("can never fail"),
_ => 8192,
}
}
fn msg_default() -> usize {
match fs::read_to_string("/proc/sys/fs/mqueue/msg_default") {
Ok(m) => m.trim().parse::<usize>().expect("can never fail"),
_ => 10,
}
}
impl Create {
pub fn run(&self, verbose: bool) -> Result<()> {
let mq = &mut posixmq::OpenOptions::readonly();
if let Some(m) = &self.permissions {
mq.mode(u32::from_str_radix(&m, 8)?);
}
mq.max_msg_len(self.msgsize.unwrap_or_else(msgsize_default))
.capacity(self.capacity.unwrap_or_else(msg_default))
.create_new()
.open(&self.queue)?;
if verbose {
let mq = PosixMq::open(&self.queue)?;
let attributes = mq.attributes()?;
println!("Created message queue: {} with attributes msgsize: {}, capacity: {}, current_messages: {}",
&self.queue,
&attributes.max_msg_len,
&attributes.capacity,
&attributes.current_messages);
}
Ok(())
}
}

24
src/posix/info.rs Normal file
View file

@ -0,0 +1,24 @@
use anyhow::Result;
use clap::Clap;
use posixmq::PosixMq;
/// Print information about an existing message queue
#[derive(Clap, Debug)]
pub struct Info {
/// Name of the queue
#[clap(value_name = "QUEUE")]
pub queue: String,
}
impl Info {
pub fn run(&self) -> Result<()> {
let attrs = PosixMq::open(&self.queue)?.attributes()?;
println!(
"Message queue: {}, msg_max: {}, msgsize_max: {}, current_messages: {}",
&self.queue, &attrs.capacity, &attrs.max_msg_len, &attrs.current_messages
);
Ok(())
}
}

86
src/posix/recv.rs Normal file
View file

@ -0,0 +1,86 @@
use anyhow::Result;
use chrono::{DateTime, Local};
use clap::Clap;
use humantime::Duration;
use posixmq::PosixMq;
use std::str;
/// Receive and print a message from a message queue
#[derive(Clap, Debug)]
pub struct Recv {
/// Do not block
#[clap(short, long)]
pub non_blocking: bool,
/// Print messages as they are received
#[clap(short, long)]
pub follow: bool,
/// Print a timestamp before each message
#[clap(short, long)]
pub timestamp: bool,
/// Timeout, example "5h 23min 42ms"
#[clap(short = 'o', long, conflicts_with = "deadline")]
pub timeout: Option<String>,
/// Deadline until messages are received (format: "%Y-%m-%d %H:%M:%S")
#[clap(short, long, conflicts_with = "timeout")]
pub deadline: Option<String>,
/// Name of the queue
#[clap(value_name = "QUEUE")]
pub queue: String,
}
fn print_message(verbose: bool, priority: u32, length: usize, timestamp: bool, msg: &str) {
if verbose {
println!("Priority: {}, length: {}", priority, length);
}
if timestamp {
println!("{}", Local::now());
}
println!("{}", msg);
}
impl Recv {
fn receive(&self, verbose: bool, mq: &PosixMq) -> Result<()> {
let mut buf = vec![0; mq.attributes()?.max_msg_len];
if let Some(timeout) = &self.timeout {
let (prio, len) = mq.recv_timeout(&mut buf, *timeout.parse::<Duration>()?)?;
print_message(verbose, prio, len, self.timestamp, str::from_utf8(&buf)?);
} else if let Some(deadline) = &self.deadline {
let (prio, len) = mq.recv_deadline(
&mut buf,
DateTime::parse_from_str(deadline, "%Y-%m-%d %H:%M:%S")?.into(),
)?;
print_message(verbose, prio, len, self.timestamp, str::from_utf8(&buf)?);
} else {
let (prio, len) = mq.recv(&mut buf)?;
print_message(verbose, prio, len, self.timestamp, str::from_utf8(&buf)?);
}
Ok(())
}
pub fn run(&self, verbose: bool) -> Result<()> {
let mq = &mut posixmq::OpenOptions::readonly();
if self.non_blocking {
mq.nonblocking();
}
let mq = mq.open(&self.queue)?;
if self.follow {
loop {
self.receive(verbose, &mq)?;
}
} else {
self.receive(verbose, &mq)?;
}
Ok(())
}
}

68
src/posix/send.rs Normal file
View file

@ -0,0 +1,68 @@
use anyhow::Result;
use chrono::DateTime;
use clap::Clap;
use humantime::Duration;
/// Send a message to a message queue
#[derive(Clap, Debug)]
pub struct Send {
/// Set a different priority, priority >= 0
#[clap(short, long, default_value = "0")]
pub priority: u32,
/// Do not block
#[clap(short, long)]
pub non_blocking: bool,
/// Timeout, example "5h 23min 42ms"
#[clap(short = 'o', long, conflicts_with = "deadline")]
pub timeout: Option<String>,
/// Deadline until messages are sent (format: "%Y-%m-%d %H:%M:%S")
#[clap(short, long, conflicts_with = "timeout")]
pub deadline: Option<String>,
/// Name of the queue
#[clap(value_name = "QUEUE")]
pub queue: String,
/// Message to be sent to the queue
#[clap(value_name = "MESSAGE")]
pub msg: String,
}
fn print_verbose(verbose: bool, msg: &str, queue: &str) {
if verbose {
println!("Sent message: \"{}\" to queue: {}", &msg, &queue);
}
}
impl Send {
pub fn run(&self, verbose: bool) -> Result<()> {
let mq = &mut posixmq::OpenOptions::writeonly();
if self.non_blocking {
mq.nonblocking();
}
if let Some(timeout) = &self.timeout {
mq.open(&self.queue)?.send_timeout(
self.priority,
&self.msg.as_bytes(),
*timeout.parse::<Duration>()?,
)?;
print_verbose(verbose, &self.msg, &self.queue);
} else if let Some(deadline) = &self.deadline {
mq.open(&self.queue)?.send_deadline(
self.priority,
&self.msg.as_bytes(),
DateTime::parse_from_str(deadline, "%Y-%m-%d %H:%M:%S")?.into(),
)?;
print_verbose(verbose, &self.msg, &self.queue);
} else {
mq.open(&self.queue)?
.send(self.priority, &self.msg.as_bytes())?;
print_verbose(verbose, &self.msg, &self.queue);
}
Ok(())
}
}

22
src/posix/unlink.rs Normal file
View file

@ -0,0 +1,22 @@
use anyhow::Result;
use clap::Clap;
/// Delete a message queue
#[derive(Clap, Debug)]
pub struct Unlink {
/// Name of the queue
#[clap(value_name = "QUEUE")]
pub queue: String,
}
impl Unlink {
pub fn run(&self, verbose: bool) -> Result<()> {
posixmq::remove_queue(&self.queue)?;
if verbose {
println!("Removed message queue: {}", &self.queue);
}
Ok(())
}
}