use anyhow::{bail, Error, Result}; use axum::{extract::Extension, routing::get, Router, Server}; use clap::Parser; use log::{debug, info, trace}; use std::{ io::{BufRead, BufReader}, net::{IpAddr, SocketAddr}, str::FromStr, sync::{ atomic::{AtomicU32, AtomicUsize, Ordering}, Arc, }, time::Duration, }; use tokio::{spawn, task::JoinHandle, try_join}; #[derive(Parser)] #[clap( about, version, author, infer_subcommands(true), propagate_version(true) )] struct Args { /// The port of the serial device, usually in '/dev/...'. #[clap(short, long, default_value = "/dev/serial0")] serial_port: String, /// The baudrate of the serial connection. #[clap(short, long, default_value = "9600")] baud_rate: u32, /// Timeout of the serial connection in ms. #[clap(short, long, default_value = "1000")] timeout: u64, /// The IPv4 or IPv6 address where the metrics are served. #[clap(short, long, default_value = "127.0.0.1")] address: IpAddr, /// The port where the metrics are served. #[clap(short, long, default_value = "9111")] port: u16, } #[derive(Default)] struct AtomicF32 { storage: AtomicU32, } impl AtomicF32 { fn store(&self, value: f32, ordering: Ordering) { let as_u32 = value.to_bits(); self.storage.store(as_u32, ordering); } fn load(&self, ordering: Ordering) -> f32 { let as_u32 = self.storage.load(ordering); f32::from_bits(as_u32) } } #[derive(Debug)] enum Speed { Slow = 0, Fast, Inst, } impl FromStr for Speed { type Err = Error; fn from_str(s: &str) -> Result { match s { "SLOW" => Ok(Self::Slow), "FAST" => Ok(Self::Fast), "INST" => Ok(Self::Inst), _ => bail!("Could not parse speed"), } } } #[derive(Default)] struct Metrics { cps: AtomicUsize, cpm: AtomicUsize, radiation: AtomicF32, mode: AtomicUsize, } fn parse(input: &str) -> Result<(usize, usize, f32, Speed)> { let input = input .replace("CPS, ", "") .replace(", CPM, ", " ") .replace(", uSv/hr, ", " ") .replace(", ", " "); let input: Vec<&str> = input.split(' ').collect(); trace!( "Parsed values: cps = {}, cpm = {}, μSv/h = {}, mode = {:?}", input[0], input[1], input[2], input[3] ); Ok(( input[0].parse::()?, input[1].parse::()?, input[2].parse::()?, input[3].parse::()?, )) } #[allow(clippy::unused_async, clippy::similar_names)] async fn listen_serial( port_name: String, baud_rate: u32, timeout: u64, metrics: Arc, ) -> Result<()> { let port = serialport::new(port_name, baud_rate) .timeout(Duration::from_millis(timeout)) .open()?; let mut port = BufReader::new(port); loop { let mut line = String::new(); port.read_line(&mut line)?; let line = line.trim(); debug!("Reading line from serial port: {}", line); let (cps, cpm, radiation, mode) = parse(line)?; metrics.cps.store(cps, Ordering::Relaxed); metrics.cpm.store(cpm, Ordering::Relaxed); metrics.radiation.store(radiation, Ordering::Relaxed); metrics.mode.store(mode as usize, Ordering::Relaxed); } } #[allow(clippy::unused_async)] async fn get_metrics(Extension(metrics): Extension>) -> String { format!( r"# HELP mightyohm_gc_cps Counts per second. # TYPE mightyohm_gc_cps gauge mightyohm_gc_cps {} # HELP mightyohm_gc_cpm Counts per minute. # TYPE mightyohm_gc_cpm gauge mightyohm_gc_cpm {} # HELP mightyohm_gc_radiation Ionizing Radiation in μSv/h. # TYPE mightyohm_gc_radiation gauge mightyohm_gc_radiation {} # HELP mightyohm_gc_mode Data acquisition mode (0 = Slow, 1 = Fast, 2 = Inst). # TYPE mightyohm_gc_mode gauge mightyohm_gc_mode {} ", metrics.cps.load(Ordering::Relaxed), metrics.cpm.load(Ordering::Relaxed), metrics.radiation.load(Ordering::Relaxed), metrics.mode.load(Ordering::Relaxed) ) } async fn listen_http(address: IpAddr, port: u16, metrics: Arc) -> Result<()> { let app = Router::new() .route("/metrics", get(get_metrics)) .layer(Extension(metrics)); let addr = SocketAddr::from((address, port)); info!("Listening on {}:{}", address, port); Ok(Server::bind(&addr).serve(app.into_make_service()).await?) } async fn flatten(handle: JoinHandle>) -> Result where T: Send, { match handle.await { Ok(Ok(result)) => Ok(result), Ok(Err(err)) => bail!(err), Err(err) => bail!(err), } } #[tokio::main] async fn main() -> Result<()> { env_logger::init(); let args = Args::parse(); let metrics = Arc::new(Metrics::default()); try_join!( flatten(spawn(listen_serial( args.serial_port, args.baud_rate, args.timeout, Arc::clone(&metrics) ))), flatten(spawn(listen_http( args.address, args.port, Arc::clone(&metrics) ))) )?; Ok(()) }