Support timeout and deadline parameters for send
Add support for timeout and deadline parameters when messages are sent to a message queue. To keep code duplication small, the `utils.rs` file contains a helper function to parse the `Duration` from the `String` received from the timeout parameter. Add conflict rules for the timeout and deadline parameters.
This commit is contained in:
parent
9b52305285
commit
1100ba1b13
4 changed files with 56 additions and 16 deletions
|
@ -7,6 +7,7 @@ mod info;
|
||||||
mod recv;
|
mod recv;
|
||||||
mod send;
|
mod send;
|
||||||
mod unlink;
|
mod unlink;
|
||||||
|
mod utils;
|
||||||
|
|
||||||
use cli::{Command, Opts};
|
use cli::{Command, Opts};
|
||||||
|
|
||||||
|
|
17
src/recv.rs
17
src/recv.rs
|
@ -1,8 +1,9 @@
|
||||||
|
use crate::utils;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use chrono::{DateTime, Local};
|
use chrono::{DateTime, Local};
|
||||||
use clap::Clap;
|
use clap::Clap;
|
||||||
use posixmq::PosixMq;
|
use posixmq::PosixMq;
|
||||||
use std::{str, time::Duration};
|
use std::str;
|
||||||
|
|
||||||
/// Receive and print a message from a message queue
|
/// Receive and print a message from a message queue
|
||||||
#[derive(Clap, Debug)]
|
#[derive(Clap, Debug)]
|
||||||
|
@ -17,10 +18,10 @@ pub struct Recv {
|
||||||
#[clap(short, long)]
|
#[clap(short, long)]
|
||||||
pub timestamp: bool,
|
pub timestamp: bool,
|
||||||
/// Timeout in "<timeout>[s]" (default) or "<timeout>ms"
|
/// Timeout in "<timeout>[s]" (default) or "<timeout>ms"
|
||||||
#[clap(short = 'o', long)]
|
#[clap(short = 'o', long, conflicts_with = "deadline")]
|
||||||
pub timeout: Option<String>,
|
pub timeout: Option<String>,
|
||||||
/// Deadline until messages are received (format: "%Y-%m-%d %H:%M:%S")
|
/// Deadline until messages are received (format: "%Y-%m-%d %H:%M:%S")
|
||||||
#[clap(short, long)]
|
#[clap(short, long, conflicts_with = "timeout")]
|
||||||
pub deadline: Option<String>,
|
pub deadline: Option<String>,
|
||||||
#[clap(value_name = "QUEUE")]
|
#[clap(value_name = "QUEUE")]
|
||||||
pub queue: String,
|
pub queue: String,
|
||||||
|
@ -43,15 +44,7 @@ impl Recv {
|
||||||
let mut buf = vec![0; mq.attributes()?.max_msg_len];
|
let mut buf = vec![0; mq.attributes()?.max_msg_len];
|
||||||
|
|
||||||
if let Some(timeout) = &self.timeout {
|
if let Some(timeout) = &self.timeout {
|
||||||
let timeout = if timeout.ends_with("ms") {
|
let (prio, len) = mq.recv_timeout(&mut buf, utils::parse_duration(timeout)?)?;
|
||||||
Duration::from_millis(timeout[0..timeout.len() - 2].parse::<u64>()?)
|
|
||||||
} else if timeout.ends_with("s") {
|
|
||||||
Duration::from_secs(timeout[0..timeout.len() - 1].parse::<u64>()?)
|
|
||||||
} else {
|
|
||||||
Duration::from_secs(timeout.parse::<u64>()?)
|
|
||||||
};
|
|
||||||
|
|
||||||
let (prio, len) = mq.recv_timeout(&mut buf, timeout)?;
|
|
||||||
|
|
||||||
print_message(verbose, prio, len, self.timestamp, str::from_utf8(&buf)?);
|
print_message(verbose, prio, len, self.timestamp, str::from_utf8(&buf)?);
|
||||||
} else if let Some(deadline) = &self.deadline {
|
} else if let Some(deadline) = &self.deadline {
|
||||||
|
|
30
src/send.rs
30
src/send.rs
|
@ -1,4 +1,6 @@
|
||||||
|
use crate::utils;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
use chrono::DateTime;
|
||||||
use clap::Clap;
|
use clap::Clap;
|
||||||
|
|
||||||
/// Send a message to a message queue
|
/// Send a message to a message queue
|
||||||
|
@ -10,6 +12,12 @@ pub struct Send {
|
||||||
/// Do not block
|
/// Do not block
|
||||||
#[clap(short, long)]
|
#[clap(short, long)]
|
||||||
pub non_blocking: bool,
|
pub non_blocking: bool,
|
||||||
|
/// Timeout in "<timeout>[s]" (default) or "<timeout>ms"
|
||||||
|
#[clap(short = 'o', long, conflicts_with = "deadline")]
|
||||||
|
pub timeout: Option<String>,
|
||||||
|
/// Deadline until messages are received (format: "%Y-%m-%d %H:%M:%S")
|
||||||
|
#[clap(short, long, conflicts_with = "timeout")]
|
||||||
|
pub deadline: Option<String>,
|
||||||
/// Name of the queue
|
/// Name of the queue
|
||||||
#[clap(value_name = "QUEUE")]
|
#[clap(value_name = "QUEUE")]
|
||||||
pub queue: String,
|
pub queue: String,
|
||||||
|
@ -26,12 +34,34 @@ impl Send {
|
||||||
mq.nonblocking();
|
mq.nonblocking();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some(timeout) = &self.timeout {
|
||||||
|
mq.open(&self.queue)?.send_timeout(
|
||||||
|
self.priority,
|
||||||
|
&self.msg.as_bytes(),
|
||||||
|
utils::parse_duration(timeout)?,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
if verbose {
|
||||||
|
println!("Sent message: \"{}\" to queue: {}", &self.msg, &self.queue);
|
||||||
|
}
|
||||||
|
} else if let Some(deadline) = &self.deadline {
|
||||||
|
mq.open(&self.queue)?.send_deadline(
|
||||||
|
self.priority,
|
||||||
|
&self.msg.as_bytes(),
|
||||||
|
DateTime::parse_from_str(deadline, "%Y-%m-%d %H:%M:%S")?.into(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
if verbose {
|
||||||
|
println!("Sent message: \"{}\" to queue: {}", &self.msg, &self.queue);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
mq.open(&self.queue)?
|
mq.open(&self.queue)?
|
||||||
.send(self.priority, &self.msg.as_bytes())?;
|
.send(self.priority, &self.msg.as_bytes())?;
|
||||||
|
|
||||||
if verbose {
|
if verbose {
|
||||||
println!("Sent message: \"{}\" to queue: {}", &self.msg, &self.queue);
|
println!("Sent message: \"{}\" to queue: {}", &self.msg, &self.queue);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
16
src/utils.rs
Normal file
16
src/utils.rs
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
use anyhow::Result;
|
||||||
|
use std::{str, time::Duration};
|
||||||
|
|
||||||
|
pub fn parse_duration(timeout: &str) -> Result<Duration> {
|
||||||
|
if timeout.ends_with("ms") {
|
||||||
|
Ok(Duration::from_millis(
|
||||||
|
timeout[0..timeout.len() - 2].parse::<u64>()?,
|
||||||
|
))
|
||||||
|
} else if timeout.ends_with("s") {
|
||||||
|
Ok(Duration::from_secs(
|
||||||
|
timeout[0..timeout.len() - 1].parse::<u64>()?,
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
Ok(Duration::from_secs(timeout.parse::<u64>()?))
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue