Reschedule the reminders when adding a new one

The thread handling the reminders is unparked and reschedules the
execution of sending out reminders.
This commit is contained in:
finga 2023-02-02 08:35:11 +01:00
parent 7ded3ef430
commit a4a1234f06
4 changed files with 61 additions and 32 deletions

View file

@ -1,4 +1,4 @@
use crate::{schema, NewReminder};
use crate::{schema, AppState, NewReminder};
use anyhow::Error;
use axum::{
extract::State,
@ -6,10 +6,7 @@ use axum::{
response::{IntoResponse, Response, Result},
Json,
};
use diesel::{
prelude::*,
r2d2::{ConnectionManager, Pool},
};
use diesel::prelude::*;
use serde::Deserialize;
use time::OffsetDateTime;
use tracing::{error, trace};
@ -33,7 +30,7 @@ where
}
#[derive(Debug, Deserialize)]
pub struct CreateReminder {
pub struct Reminder {
#[serde(with = "time::serde::iso8601")]
planned: OffsetDateTime,
title: String,
@ -43,8 +40,8 @@ pub struct CreateReminder {
#[allow(clippy::unused_async)]
pub async fn create_reminder(
State(db_pool): State<Pool<ConnectionManager<PgConnection>>>,
Json(data): Json<CreateReminder>,
State(state): State<AppState>,
Json(data): Json<Reminder>,
) -> Result<impl IntoResponse, ServerError> {
let reminder = NewReminder {
created: OffsetDateTime::now_utc(),
@ -58,7 +55,10 @@ pub async fn create_reminder(
diesel::insert_into(schema::reminders::table)
.values(&reminder)
.execute(&mut db_pool.get()?)?;
.execute(&mut state.db_pool.get()?)?;
trace!("unpark reminder thread to reschedule next run");
state.reminder.thread().unpark();
Ok((StatusCode::CREATED, "Reminder created".to_string()))
}

View file

@ -6,7 +6,7 @@ use diesel::{
r2d2::{ConnectionManager, Pool},
};
use lettre::{Message, SmtpTransport, Transport};
use std::{env, net::SocketAddr, time::Duration};
use std::{env, net::SocketAddr, sync::Arc, thread::JoinHandle, time::Duration};
use time::OffsetDateTime;
use tower::ServiceBuilder;
use tower_http::trace::TraceLayer;
@ -80,6 +80,12 @@ fn remind(
Ok(())
}
#[derive(Clone)]
pub struct AppState {
db_pool: Pool<ConnectionManager<PgConnection>>,
reminder: Arc<JoinHandle<Result<(), Error>>>,
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
@ -97,25 +103,23 @@ async fn main() -> Result<()> {
);
let config = Config::load_config(args.config)?;
let mut db_pool = get_connection_pool(&config)?;
std::thread::spawn(move || -> Result<(), Error> {
let reminder = std::thread::spawn(move || -> Result<(), Error> {
let mailer = SmtpTransport::unencrypted_localhost();
loop {
remind(&mut db_pool, &mailer)?;
}
});
let db_pool = get_connection_pool(&config)?;
let app = Router::new()
.route("/v1/reminder", post(api::create_reminder))
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()))
.fallback(api::not_found)
.with_state(db_pool);
.with_state(AppState {
db_pool,
reminder: Arc::new(reminder),
});
let addr = SocketAddr::from((config.server.address, config.server.port));
info!("{} listening on {}", env!("CARGO_PKG_NAME"), addr);