use anyhow::{bail, Result}; use axum::{routing::get, Extension, Router, Server}; use clap::Parser; use log::{debug, error, info}; use rppal::{ gpio::{Gpio, Mode}, hal::Delay, }; use rppal_dht11::{Dht11, Measurement}; use std::{ net::{IpAddr, SocketAddr}, sync::{ atomic::{AtomicU64, Ordering}, Arc, }, }; use tokio::{spawn, task::JoinHandle, try_join}; #[derive(Parser)] #[clap( about, version, author, infer_subcommands(true), propagate_version(true) )] struct Args { /// The GPIO pin of the DHT11 data line #[clap(short, long, default_value = "4")] gpio: u8, /// The IPv4 or IPv6 address where the metrics are served. #[clap(short, long, default_value = "127.0.0.1")] address: IpAddr, /// The port where the metrics are served. #[clap(short, long, default_value = "9112")] port: u16, } #[derive(Default)] struct AtomicF64 { storage: AtomicU64, } impl AtomicF64 { fn store(&self, value: f64, ordering: Ordering) { let as_u64 = value.to_bits(); self.storage.store(as_u64, ordering); } fn load(&self, ordering: Ordering) -> f64 { let as_u64 = self.storage.load(ordering); f64::from_bits(as_u64) } } #[derive(Default)] struct Metrics { temperature: AtomicF64, humidity: AtomicF64, } #[allow(clippy::unused_async)] async fn listen_dht11(pin: u8, metrics: Arc) -> Result<()> { let pin = Gpio::new()?.get(pin)?.into_io(Mode::Output); let mut dht11 = Dht11::new(pin); let mut delay = Delay::new(); loop { match dht11.perform_measurement_with_retries(&mut delay, 20) { Ok(Measurement { temperature, humidity, }) => { let (temperature, humidity) = (f64::from(temperature) / 10.0, f64::from(humidity) / 10.0); debug!( "Temperature: {:.1} °C Humidity: {:.1}%", temperature, humidity ); metrics.temperature.store(temperature, Ordering::Relaxed); metrics.humidity.store(humidity, Ordering::Relaxed); } Err(e) => error!("Failed to perform measurement: {:?}", e), } } } #[allow(clippy::unused_async)] async fn get_metrics(Extension(metrics): Extension>) -> String { format!( r"# HELP dht11 temperature in Celsius. # TYPE dht11_temperature gauge dht11_temperature {} # HELP dht11 humidity in percent. # TYPE dht11_humidity gauge dht11_humidity {} ", metrics.temperature.load(Ordering::Relaxed), metrics.humidity.load(Ordering::Relaxed), ) } async fn listen_http(address: IpAddr, port: u16, metrics: Arc) -> Result<()> { let app = Router::new() .route("/metrics", get(get_metrics)) .layer(Extension(metrics)); let addr = SocketAddr::from((address, port)); info!("Listening on {}:{}", address, port); Ok(Server::bind(&addr).serve(app.into_make_service()).await?) } async fn flatten(handle: JoinHandle>) -> Result where T: Send, { match handle.await { Ok(Ok(result)) => Ok(result), Ok(Err(err)) => bail!(err), Err(err) => bail!(err), } } #[tokio::main] async fn main() -> Result<()> { env_logger::init(); let args = Args::parse(); let metrics = Arc::new(Metrics::default()); try_join!( flatten(spawn(listen_dht11(4, Arc::clone(&metrics)))), flatten(spawn(listen_http( args.address, args.port, Arc::clone(&metrics) ))) )?; Ok(()) }