use anyhow::{anyhow, Error, Result}; use axum::{routing::post, Router}; use clap::Parser; use diesel::{ prelude::*, r2d2::{ConnectionManager, Pool}, }; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use lettre::{Message, SmtpTransport, Transport}; use std::{env, net::SocketAddr, sync::Arc, thread::JoinHandle, time::Duration}; use time::OffsetDateTime; use tower::ServiceBuilder; use tower_http::trace::TraceLayer; use tracing::{debug, info, trace}; mod api; mod args; mod config; mod models; mod schema; use args::Args; use config::Config; use models::{NewReminder, Reminder}; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); fn get_connection_pool(config: &Config) -> Result>> { Ok( Pool::builder().build(ConnectionManager::::new(format!( "postgresql://{}:{}@{}:{}/{}", config.database.user, config.database.pass, config.database.host, config.database.port, config.database.name )))?, ) } #[allow(clippy::cognitive_complexity)] fn remind( db_pool: &mut Pool>, mailer: &SmtpTransport, ) -> Result<()> { info!("checking for reminders"); let result = schema::reminders::dsl::reminders .filter(schema::reminders::executed.is_null()) .order(schema::reminders::planned.asc()) .load::(&mut db_pool.get()?)?; if result.is_empty() { info!("no reminders present, parking indefinitely"); std::thread::park(); } else { for reminder in result { trace!(?reminder, "checking reminder"); if reminder.planned <= OffsetDateTime::now_utc() { let email = Message::builder() .from(format!("{}@localhost", env!("CARGO_BIN_NAME")).parse()?) .to(reminder.receiver.parse()?) .subject(&reminder.title) .body(reminder.message.to_string())?; mailer.send(&email)?; diesel::update(&reminder) .set(schema::reminders::executed.eq(Some(OffsetDateTime::now_utc()))) .execute(&mut db_pool.get()?)?; debug!("email sent to {}", reminder.receiver); } else { let duration = reminder.planned - OffsetDateTime::now_utc(); info!(?duration, "parking reminder"); std::thread::park_timeout(::try_from(duration)?); return Ok(()); } } } Ok(()) } #[derive(Clone)] pub struct AppState { db_pool: Pool>, reminder: Arc>>, } #[tokio::main] async fn main() -> Result<()> { let args = Args::parse(); if env::var("RUST_LOG").is_err() { env::set_var("RUST_LOG", format!("{}", args.log_level)); } tracing_subscriber::fmt::init(); info!( "{} {} started", env!("CARGO_BIN_NAME"), env!("CARGO_PKG_VERSION") ); let config = Config::load_config(args.config)?; let mut db_pool = get_connection_pool(&config)?; trace!(migrations = ?db_pool .get()? .run_pending_migrations(MIGRATIONS) .map_err(|e| anyhow!(e)), "running database migrations"); let reminder = std::thread::spawn(move || -> Result<(), Error> { let mailer = SmtpTransport::unencrypted_localhost(); loop { remind(&mut db_pool, &mailer)?; } }); let db_pool = get_connection_pool(&config)?; let app = Router::new() .route("/v1/reminder", post(api::create_reminder)) .layer(ServiceBuilder::new().layer(TraceLayer::new_for_http())) .fallback(api::not_found) .with_state(AppState { db_pool, reminder: Arc::new(reminder), }); let addr = SocketAddr::from((config.server.address, config.server.port)); info!("{} listening on {}", env!("CARGO_PKG_NAME"), addr); axum::Server::bind(&addr) .serve(app.into_make_service()) .await?; Ok(()) }