mightyohm-gc-exporter/src/main.rs
finga b20ed73290
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Also listen for http requests and serve metrics
Use axum to listen for http requests to respond with the metrics.
2022-04-23 21:02:56 +02:00

165 lines
4.1 KiB
Rust

use anyhow::{bail, Error, Result};
use axum::{extract::Extension, routing::get, Router, Server};
use log::{debug, info, trace};
use std::{
io::{BufRead, BufReader},
net::SocketAddr,
str::FromStr,
sync::{
atomic::{AtomicU32, AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use tokio::{spawn, task::JoinHandle, try_join};
#[derive(Default)]
struct AtomicF32 {
storage: AtomicU32,
}
impl AtomicF32 {
fn store(&self, value: f32, ordering: Ordering) {
let as_u32 = value.to_bits();
self.storage.store(as_u32, ordering);
}
fn load(&self, ordering: Ordering) -> f32 {
let as_u32 = self.storage.load(ordering);
f32::from_bits(as_u32)
}
}
#[derive(Debug)]
enum Speed {
Slow = 0,
Fast,
Inst,
}
impl FromStr for Speed {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
match s {
"SLOW" => Ok(Self::Slow),
"FAST" => Ok(Self::Fast),
"INST" => Ok(Self::Inst),
_ => bail!("Could not parse speed"),
}
}
}
#[derive(Default)]
struct Metrics {
cps: AtomicUsize,
cpm: AtomicUsize,
radiation: AtomicF32,
mode: AtomicUsize,
}
fn parse(input: &str) -> Result<(usize, usize, f32, Speed)> {
let input = input
.replace("CPS, ", "")
.replace(", CPM, ", " ")
.replace(", uSv/hr, ", " ")
.replace(", ", " ");
let input: Vec<&str> = input.split(' ').collect();
trace!(
"Parsed values: cps = {}, cpm = {}, μSv/h = {}, mode = {:?}",
input[0],
input[1],
input[2],
input[3]
);
Ok((
input[0].parse::<usize>()?,
input[1].parse::<usize>()?,
input[2].parse::<f32>()?,
input[3].parse::<Speed>()?,
))
}
#[allow(clippy::unused_async, clippy::similar_names)]
async fn listen_serial(metrics: Arc<Metrics>) -> Result<()> {
let port_name = "/dev/serial0";
let baud_rate = 9600;
let port = serialport::new(port_name, baud_rate)
.timeout(Duration::from_millis(1000))
.open()?;
let mut port = BufReader::new(port);
loop {
let mut line = String::new();
port.read_line(&mut line)?;
let line = line.trim();
debug!("Reading line from serial port: {}", line);
let (cps, cpm, radiation, mode) = parse(line)?;
metrics.cps.store(cps, Ordering::Relaxed);
metrics.cpm.store(cpm, Ordering::Relaxed);
metrics.radiation.store(radiation, Ordering::Relaxed);
metrics.mode.store(mode as usize, Ordering::Relaxed);
}
}
#[allow(clippy::unused_async)]
async fn get_metrics(Extension(metrics): Extension<Arc<Metrics>>) -> String {
format!(
r"# HELP mightyohm_gc_cps Counts per second.
# TYPE mightyohm_gc_cps gauge
mightyohm_gc_cps {}
# HELP mightyohm_gc_cpm Counts per minute.
# TYPE mightyohm_gc_cpm gauge
mightyohm_gc_cpm {}
# HELP mightyohm_gc_radiation Ionizing Radiation in μSv/h.
# TYPE mightyohm_gc_radiation gauge
mightyohm_gc_radiation {}
# HELP mightyohm_gc_mode Data acquisition mode (0 = Slow, 1 = Fast, 2 = Inst).
# TYPE mightyohm_gc_mode gauge
mightyohm_gc_mode {}
",
metrics.cps.load(Ordering::Relaxed),
metrics.cpm.load(Ordering::Relaxed),
metrics.radiation.load(Ordering::Relaxed),
metrics.mode.load(Ordering::Relaxed)
)
}
async fn listen_http(metrics: Arc<Metrics>) -> Result<()> {
let app = Router::new()
.route("/metrics", get(get_metrics))
.layer(Extension(metrics));
let addr = SocketAddr::from(([0, 0, 0, 0], 9111));
info!("Listening on 0.0.0.0:9111");
Ok(Server::bind(&addr).serve(app.into_make_service()).await?)
}
async fn flatten<T>(handle: JoinHandle<Result<T>>) -> Result<T>
where
T: Send,
{
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => bail!(err),
Err(err) => bail!(err),
}
}
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let metrics = Arc::new(Metrics::default());
try_join!(
flatten(spawn(listen_serial(Arc::clone(&metrics)))),
flatten(spawn(listen_http(Arc::clone(&metrics))))
)?;
Ok(())
}