kill listeners after reload

This commit is contained in:
Michael Zhang 2024-11-30 17:15:13 -06:00
parent a800ddd08e
commit 7beb26d5ae
2 changed files with 50 additions and 19 deletions

View file

@ -19,7 +19,10 @@ use sqlx::{
};
use tokio::{
net::TcpListener,
sync::mpsc::{self},
sync::{
mpsc::{self},
oneshot,
},
};
use crate::block::{
@ -27,6 +30,9 @@ use crate::block::{
};
use crate::worker::worker;
pub type ExitSender = oneshot::Sender<()>;
pub type ExitListener = oneshot::Receiver<()>;
#[derive(Clone)]
pub struct AppState {
pub db: SqlitePool,

View file

@ -1,10 +1,13 @@
use std::str::FromStr;
use anyhow::Result;
use futures::TryStreamExt;
use futures::{select, TryStreamExt};
use serde_json::Value;
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender},
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
},
task::JoinHandle,
};
use tokio_util::{
@ -12,46 +15,58 @@ use tokio_util::{
io::StreamReader,
};
use crate::{block::Block, AppState, DatabaseUpdate};
use crate::{block::Block, AppState, DatabaseUpdate, ExitListener, ExitSender};
pub async fn worker(
state: AppState,
mut database_updates: UnboundedReceiver<DatabaseUpdate>,
) {
let mut prev_handle: Option<JoinHandle<()>> = None;
let mut prev_exit_tx: Option<ExitSender> = None;
loop {
let upd = database_updates.recv().await;
println!("Update: {upd:?}");
if let Some(handle) = prev_handle.take() {
handle.abort();
if let Some(exit_tx) = prev_exit_tx.take() {
exit_tx.send(());
}
let state = state.clone();
prev_handle = Some(tokio::spawn(async move {
run(state).await;
}));
let (tx, rx) = oneshot::channel();
prev_exit_tx = Some(tx);
tokio::spawn(async move {
run(rx, state).await;
});
}
}
async fn run(state: AppState) -> Result<()> {
async fn run(exit_rx: ExitListener, state: AppState) -> Result<()> {
let blocks = sqlx::query_as::<_, Block>(r"SELECT * FROM blocks")
.fetch_all(&state.db)
.await?;
println!("blocks: {blocks:?}");
let mut exit_txs = Vec::new();
for block in blocks {
let (tx, rx) = oneshot::channel();
exit_txs.push(tx);
if block.kind == "ntfy" {
tokio::spawn(run_ntfy_listener());
tokio::spawn(run_ntfy_listener(rx));
}
}
exit_rx.await;
for tx in exit_txs {
tx.send(());
}
Ok(())
}
async fn run_ntfy_listener() {
async fn run_ntfy_listener(mut exit_rx: ExitListener) {
// TODO: Exponential backoff retry
let res = reqwest::get("https://ntfy.sh/LIXE61yAwoClnxyL/json")
.await
@ -60,12 +75,22 @@ async fn run_ntfy_listener() {
let stream = StreamReader::new(stream);
let mut stream = FramedRead::new(stream, LinesCodec::new());
while let Ok(Some(line)) = stream.try_next().await {
let line = Value::from_str(&line).unwrap();
println!("line: {line:?}");
loop {
tokio::select! {
line = stream.try_next() => {
let line = match line {
Ok(Some(line)) => Value::from_str(&line).unwrap(),
_ => continue,
};
// TODO: Push this event to the database
// TODO: Get a list of all of the connections from the database
// TODO: Push a new task for each of the new connections into the database
println!("line: {line:?}");
// TODO: Tell ntfy.sh that we received it
// TODO: Push this event to the database
// TODO: Get a list of all of the connections from the database
// TODO: Push a new task for each of the new connections into the database
}
_ = &mut exit_rx => { break }
}
}
}