Receive message(s) from a queue

This subcommand receives one or more messages from a queue. Three
different methods are supported, read message by message, read until a
given timout or read until a specified date and time. This subcommand
also supports the `--non-blocking` flag.
This commit is contained in:
finga 2021-06-20 01:53:15 +02:00
parent 36d9e8c02c
commit 3572b12a1a
5 changed files with 147 additions and 1 deletions

View file

@ -1,4 +1,4 @@
use crate::{create::Create, info::Info, send::Send, unlink::Unlink};
use crate::{create::Create, info::Info, recv::Recv, send::Send, unlink::Unlink};
use clap::{crate_authors, crate_version, AppSettings, Clap};
#[derive(Clap, Debug)]
@ -23,4 +23,5 @@ pub enum Command {
Info(Info),
Unlink(Unlink),
Send(Send),
Recv(Recv),
}

View file

@ -4,6 +4,7 @@ use clap::Clap;
mod cli;
mod create;
mod info;
mod recv;
mod send;
mod unlink;
@ -17,6 +18,7 @@ fn main() -> Result<()> {
Command::Info(i) => i.run()?,
Command::Unlink(u) => u.run(opts.verbose)?,
Command::Send(s) => s.run(opts.verbose)?,
Command::Recv(r) => r.run(opts.verbose)?,
}
Ok(())

92
src/recv.rs Normal file
View file

@ -0,0 +1,92 @@
use anyhow::Result;
use chrono::{DateTime, Local};
use clap::Clap;
use posixmq::PosixMq;
use std::{str, time::Duration};
/// Receive and print a message from a message queue
#[derive(Clap, Debug)]
pub struct Recv {
/// Do not block
#[clap(short, long)]
pub non_blocking: bool,
/// Print messages as they are received
#[clap(short, long)]
pub follow: bool,
/// Print a timestamp before each message
#[clap(short, long)]
pub timestamp: bool,
/// Timeout in "<timeout>[s]" (default) or "<timeout>ms"
#[clap(short = 'o', long)]
pub timeout: Option<String>,
/// Deadline until messages are received (format: "%Y-%m-%d %H:%M:%S")
#[clap(short, long)]
pub deadline: Option<String>,
#[clap(value_name = "QNAME")]
pub queue: String,
}
fn print_message(verbose: bool, priority: u32, length: usize, timestamp: bool, msg: &str) {
if verbose {
println!("Priority: {}, length: {}", priority, length);
}
if timestamp {
println!("{}", Local::now());
}
println!("{}", msg);
}
impl Recv {
fn receive(&self, verbose: bool, mq: &PosixMq) -> Result<()> {
let mut buf = vec![0; mq.attributes()?.max_msg_len];
if let Some(timeout) = &self.timeout {
let timeout = if timeout.ends_with("ms") {
Duration::from_millis(timeout[0..timeout.len() - 2].parse::<u64>()?)
} else if timeout.ends_with("s") {
Duration::from_secs(timeout[0..timeout.len() - 1].parse::<u64>()?)
} else {
Duration::from_secs(timeout.parse::<u64>()?)
};
let (prio, len) = mq.recv_timeout(&mut buf, timeout)?;
print_message(verbose, prio, len, self.timestamp, str::from_utf8(&buf)?);
} else if let Some(deadline) = &self.deadline {
let (prio, len) = mq.recv_deadline(
&mut buf,
DateTime::parse_from_str(deadline, "%Y-%m-%d %H:%M:%S")?.into(),
)?;
print_message(verbose, prio, len, self.timestamp, str::from_utf8(&buf)?);
} else {
let (prio, len) = mq.recv(&mut buf)?;
print_message(verbose, prio, len, self.timestamp, str::from_utf8(&buf)?);
}
Ok(())
}
pub fn run(&self, verbose: bool) -> Result<()> {
let mq = &mut posixmq::OpenOptions::readonly();
if self.non_blocking {
mq.nonblocking();
}
let mq = mq.open(&self.queue)?;
if self.follow {
loop {
self.receive(verbose, &mq)?;
}
} else {
self.receive(verbose, &mq)?;
}
Ok(())
}
}