mqrs/src/posix/recv.rs
finga 1aab989000 Update build dependencies and edition
Use the 2021 Rust edition. To use current version of clap changes were
adapted.
2021-12-03 14:50:23 +01:00

86 lines
2.2 KiB
Rust

use anyhow::Result;
use chrono::{DateTime, Local};
use clap::Parser;
use humantime::Duration;
use log::info;
use posixmq::PosixMq;
use std::str;
/// Receive and print a message from a message queue
#[derive(Debug, Parser)]
pub struct Recv {
/// Do not block
#[clap(short, long)]
non_blocking: bool,
/// Print messages as they are received
#[clap(short, long)]
follow: bool,
/// Print a timestamp before each message
#[clap(short, long)]
timestamp: bool,
/// Timeout, example "5h 23min 42ms"
#[clap(short = 'o', long, conflicts_with = "deadline")]
timeout: Option<String>,
/// Deadline until messages are received (format: "%Y-%m-%d %H:%M:%S")
#[clap(short, long, conflicts_with = "timeout")]
deadline: Option<String>,
/// Name of the queue
#[clap(value_name = "QUEUE")]
queue: String,
}
fn print_message(priority: u32, length: usize, timestamp: bool, msg: &str) {
info!("Priority: {}, length: {}", priority, length);
if timestamp {
println!("{}", Local::now());
}
println!("{}", msg);
}
impl Recv {
fn receive(&self, mq: &PosixMq) -> Result<()> {
let mut buf = vec![0; mq.attributes()?.max_msg_len];
if let Some(timeout) = &self.timeout {
let (prio, len) = mq.recv_timeout(&mut buf, *timeout.parse::<Duration>()?)?;
print_message(prio, len, self.timestamp, str::from_utf8(&buf)?);
} else if let Some(deadline) = &self.deadline {
let (prio, len) = mq.recv_deadline(
&mut buf,
DateTime::parse_from_str(deadline, "%Y-%m-%d %H:%M:%S")?.into(),
)?;
print_message(prio, len, self.timestamp, str::from_utf8(&buf)?);
} else {
let (prio, len) = mq.recv(&mut buf)?;
print_message(prio, len, self.timestamp, str::from_utf8(&buf)?);
}
Ok(())
}
pub fn run(&self) -> Result<()> {
let mq = &mut posixmq::OpenOptions::readonly();
if self.non_blocking {
mq.nonblocking();
}
let mq = mq.open(&self.queue)?;
if self.follow {
loop {
self.receive(&mq)?;
}
} else {
self.receive(&mq)?;
}
Ok(())
}
}