diff --git a/Cargo.lock b/Cargo.lock index a76a2d3..4d2e6b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -29,6 +29,19 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "chrono" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" +dependencies = [ + "libc", + "num-integer", + "num-traits", + "time", + "winapi", +] + [[package]] name = "clap" version = "3.0.0-beta.2" @@ -112,10 +125,30 @@ name = "mqrs" version = "0.1.0-dev" dependencies = [ "anyhow", + "chrono", "clap", "posixmq", ] +[[package]] +name = "num-integer" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +dependencies = [ + "autocfg", +] + [[package]] name = "os_str_bytes" version = "2.4.0" @@ -208,6 +241,17 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "time" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" +dependencies = [ + "libc", + "wasi", + "winapi", +] + [[package]] name = "unicode-segmentation" version = "1.7.1" @@ -238,6 +282,12 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +[[package]] +name = "wasi" +version = "0.10.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 4222fbc..b008296 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,4 @@ description = "A CLI program for interacting Posix Message Queues" anyhow = "1.0" clap = "3.0.0-beta.2" posixmq = "1.0" +chrono = "0.4" diff --git a/src/cli.rs b/src/cli.rs index 8568937..4148233 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,4 +1,4 @@ -use crate::{create::Create, info::Info, send::Send, unlink::Unlink}; +use crate::{create::Create, info::Info, recv::Recv, send::Send, unlink::Unlink}; use clap::{crate_authors, crate_version, AppSettings, Clap}; #[derive(Clap, Debug)] @@ -23,4 +23,5 @@ pub enum Command { Info(Info), Unlink(Unlink), Send(Send), + Recv(Recv), } diff --git a/src/main.rs b/src/main.rs index e361ffe..e6bc64a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use clap::Clap; mod cli; mod create; mod info; +mod recv; mod send; mod unlink; @@ -17,6 +18,7 @@ fn main() -> Result<()> { Command::Info(i) => i.run()?, Command::Unlink(u) => u.run(opts.verbose)?, Command::Send(s) => s.run(opts.verbose)?, + Command::Recv(r) => r.run(opts.verbose)?, } Ok(()) diff --git a/src/recv.rs b/src/recv.rs new file mode 100644 index 0000000..231d841 --- /dev/null +++ b/src/recv.rs @@ -0,0 +1,92 @@ +use anyhow::Result; +use chrono::{DateTime, Local}; +use clap::Clap; +use posixmq::PosixMq; +use std::{str, time::Duration}; + +/// Receive and print a message from a message queue +#[derive(Clap, Debug)] +pub struct Recv { + /// Do not block + #[clap(short, long)] + pub non_blocking: bool, + /// Print messages as they are received + #[clap(short, long)] + pub follow: bool, + /// Print a timestamp before each message + #[clap(short, long)] + pub timestamp: bool, + /// Timeout in "[s]" (default) or "ms" + #[clap(short = 'o', long)] + pub timeout: Option, + /// Deadline until messages are received (format: "%Y-%m-%d %H:%M:%S") + #[clap(short, long)] + pub deadline: Option, + #[clap(value_name = "QNAME")] + pub queue: String, +} + +fn print_message(verbose: bool, priority: u32, length: usize, timestamp: bool, msg: &str) { + if verbose { + println!("Priority: {}, length: {}", priority, length); + } + + if timestamp { + println!("{}", Local::now()); + } + + println!("{}", msg); +} + +impl Recv { + fn receive(&self, verbose: bool, mq: &PosixMq) -> Result<()> { + let mut buf = vec![0; mq.attributes()?.max_msg_len]; + + if let Some(timeout) = &self.timeout { + let timeout = if timeout.ends_with("ms") { + Duration::from_millis(timeout[0..timeout.len() - 2].parse::()?) + } else if timeout.ends_with("s") { + Duration::from_secs(timeout[0..timeout.len() - 1].parse::()?) + } else { + Duration::from_secs(timeout.parse::()?) + }; + + let (prio, len) = mq.recv_timeout(&mut buf, timeout)?; + + print_message(verbose, prio, len, self.timestamp, str::from_utf8(&buf)?); + } else if let Some(deadline) = &self.deadline { + let (prio, len) = mq.recv_deadline( + &mut buf, + DateTime::parse_from_str(deadline, "%Y-%m-%d %H:%M:%S")?.into(), + )?; + + print_message(verbose, prio, len, self.timestamp, str::from_utf8(&buf)?); + } else { + let (prio, len) = mq.recv(&mut buf)?; + + print_message(verbose, prio, len, self.timestamp, str::from_utf8(&buf)?); + } + + Ok(()) + } + + pub fn run(&self, verbose: bool) -> Result<()> { + let mq = &mut posixmq::OpenOptions::readonly(); + + if self.non_blocking { + mq.nonblocking(); + } + + let mq = mq.open(&self.queue)?; + + if self.follow { + loop { + self.receive(verbose, &mq)?; + } + } else { + self.receive(verbose, &mq)?; + } + + Ok(()) + } +}