aah/src/jobs.rs

111 lines
2.3 KiB
Rust

use std::{
collections::{HashMap, HashSet},
pin::Pin,
sync::Arc,
time::Duration,
};
use futures::{future::pending, Future, FutureExt};
use tokio::{
select,
sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
RwLock,
},
time::sleep,
};
pub struct Job {
pub code: Pin<Box<dyn Future<Output = ()> + Send>>,
pub schedule: Schedule,
}
pub enum Schedule {
ASAP,
}
pub type JobSender = UnboundedSender<Job>;
#[derive(Clone)]
pub struct Scheduler {
rx: Arc<RwLock<UnboundedReceiver<Job>>>,
}
impl Scheduler {
pub fn new() -> (Self, JobSender) {
let (tx, rx) = mpsc::unbounded_channel();
let rx = Arc::new(RwLock::new(rx));
let scheduler = Scheduler { rx };
(scheduler, tx)
}
pub async fn run(self) {
let mut id: usize = 1;
let mut jobs = HashMap::<usize, Job>::new();
let mut remove_set = HashSet::new();
loop {
// Calculate the next job
let mut get_next_job = || {
let mut next_job = pending().boxed();
let next_job_time = jobs
.iter()
.map(|(job_id, job)| {
let time = match job.schedule {
Schedule::ASAP => Duration::ZERO,
};
(job_id, time)
})
.min_by_key(|(_, time)| *time);
if let Some((job_id, time)) = next_job_time {
let job_id = *job_id;
let job = jobs.remove(&job_id).unwrap();
if time.is_zero() {
next_job = async {
job.code.await;
}
.boxed();
} else {
next_job = sleep(time).then(move |_| job.code).boxed();
}
}
next_job
};
let next_job = get_next_job();
let mut write_lock = select! {
rx = self.rx.write() => rx,
_ = next_job => {
continue;
}
};
let next_job = get_next_job();
select! {
// Did a new job just come in?
job = write_lock.recv() => {
let job = match job {
Some(v) => v,
None => continue,
};
let this_id = id;
id += 1;
jobs.insert(this_id, job);
}
_ = next_job => {
}
}
for x in remove_set.drain() {
jobs.remove(x);
}
}
}
}