diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..16f1e73 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 467bf42..10d90bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,6 +12,7 @@ dependencies = [ "base64 0.21.2", "bollard", "clap", + "console-subscriber", "entity", "futures", "migration", @@ -691,6 +692,42 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console-api" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e" +dependencies = [ + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4cf42660ac07fcebed809cfe561dd8730bcd35b075215e6479c516bcd0d11cb" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures", + "hdrhistogram", + "humantime", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "const-oid" version = "0.9.4" @@ -727,6 +764,25 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.8" @@ -890,6 +946,16 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +[[package]] +name = "flate2" +version = "1.0.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.10.14" @@ -1136,7 +1202,10 @@ version = "7.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8" dependencies = [ + "base64 0.13.1", "byteorder", + "flate2", + "nom", "num-traits", ] @@ -1222,6 +1291,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.27" @@ -1246,6 +1321,18 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyperlocal" version = "0.8.0" @@ -1839,6 +1926,38 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 1.0.109", +] + +[[package]] +name = "prost-types" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +dependencies = [ + "prost", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -2901,9 +3020,20 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.1.0" @@ -2949,6 +3079,34 @@ dependencies = [ "serde", ] +[[package]] +name = "tonic" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +dependencies = [ + "async-trait", + "axum", + "base64 0.21.2", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -3013,6 +3171,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" dependencies = [ "once_cell", + "valuable", ] [[package]] @@ -3113,6 +3272,12 @@ dependencies = [ "serde", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "value-bag" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index e6ffc51..e52d568 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,3 +28,4 @@ sqlx = { version = "0.7.1", features = ["runtime-tokio-rustls", "sqlite", "migra tokio = { version = "1.29.1", features = ["full"] } tower = { version = "0.4.13", features = ["full"] } base64 = "0.21.2" +console-subscriber = "0.1.10" diff --git a/src/config.rs b/src/config.rs index 7c036cb..ee99d9f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap}; +use std::collections::HashMap; #[derive(Debug, Serialize, Deserialize)] pub struct Config { diff --git a/src/jobs.rs b/src/jobs.rs index 2ecec0c..45a7b91 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -1,23 +1,110 @@ -use futures::Future; -use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use std::{ + collections::{HashMap, HashSet}, + pin::Pin, + sync::Arc, + time::Duration, +}; + +use futures::{future::pending, Future, FutureExt}; +use tokio::{ + select, + sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + RwLock, + }, + time::sleep, +}; + +pub struct Job { + pub code: Pin + Send>>, + pub schedule: Schedule, +} + +pub enum Schedule { + ASAP, +} -pub type Job = Box>; pub type JobSender = UnboundedSender; +#[derive(Clone)] pub struct Scheduler { - rx: UnboundedReceiver, + rx: Arc>>, } impl Scheduler { pub fn new() -> (Self, JobSender) { let (tx, rx) = mpsc::unbounded_channel(); + let rx = Arc::new(RwLock::new(rx)); let scheduler = Scheduler { rx }; (scheduler, tx) } pub async fn run(self) { + let mut id: usize = 1; + let mut jobs = HashMap::::new(); + let mut remove_set = HashSet::new(); + loop { - // Get the next job, or if a new thing comes in + // Calculate the next job + let mut get_next_job = || { + let mut next_job = pending().boxed(); + let next_job_time = jobs + .iter() + .map(|(job_id, job)| { + let time = match job.schedule { + Schedule::ASAP => Duration::ZERO, + }; + (job_id, time) + }) + .min_by_key(|(_, time)| *time); + if let Some((job_id, time)) = next_job_time { + let job_id = *job_id; + let job = jobs.remove(&job_id).unwrap(); + if time.is_zero() { + next_job = async { + job.code.await; + } + .boxed(); + } else { + next_job = sleep(time).then(move |_| job.code).boxed(); + } + } + + next_job + }; + + let next_job = get_next_job(); + + let mut write_lock = select! { + rx = self.rx.write() => rx, + _ = next_job => { + continue; + } + }; + + let next_job = get_next_job(); + + select! { + // Did a new job just come in? + job = write_lock.recv() => { + let job = match job { + Some(v) => v, + None => continue, + }; + + let this_id = id; + id += 1; + + jobs.insert(this_id, job); + } + + _ = next_job => { + } + } + + for x in remove_set.drain() { + jobs.remove(x); + } } } } diff --git a/src/lib.rs b/src/lib.rs index 8412cd2..1e3e0d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,17 @@ #[macro_use] extern crate serde; -use sea_orm::DatabaseConnection; - pub mod auth; pub mod config; pub mod error; pub mod jobs; pub mod upload; +use jobs::JobSender; +use sea_orm::DatabaseConnection; + #[derive(Clone)] pub struct AppState { pub conn: DatabaseConnection, + pub tx: JobSender, } diff --git a/src/main.rs b/src/main.rs index ad27ad4..da564d5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use aah::{ - auth::http_basic_auth, upload::handle_upload_configuration, AppState, + auth::http_basic_auth, jobs::Scheduler, upload::handle_upload_configuration, + AppState, }; use anyhow::Result; use axum::{ @@ -8,17 +9,21 @@ use axum::{ routing::{get, post}, Router, }; - +use futures::try_join; use migration::{Migrator, MigratorTrait}; #[tokio::main] async fn main() -> Result<()> { + console_subscriber::init(); + let options = "sqlite:test.db"; let conn = sea_orm::Database::connect(options).await?; Migrator::up(&conn, None).await?; // Run a web server - let state = AppState { conn }; + let (sched, tx) = Scheduler::new(); + let sched_handle = tokio::spawn(sched.run()); + let state = AppState { conn, tx }; let app = Router::new() .route( "/", @@ -33,6 +38,7 @@ async fn main() -> Result<()> { .serve(app.into_make_service()); let webserver_handle = tokio::spawn(webserver); - webserver_handle.await??; + let (a, _) = try_join!(webserver_handle, sched_handle).unwrap(); + a.unwrap(); Ok(()) } diff --git a/src/upload.rs b/src/upload.rs index b050b76..5923df4 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -1,4 +1,6 @@ -use anyhow::anyhow; +use std::time::Duration; + +use anyhow::{anyhow, Result}; use axum::extract::{Multipart, State}; use bollard::{ container::{ @@ -8,42 +10,88 @@ use bollard::{ Docker, }; use entity::service; -use futures::TryStreamExt; +use futures::{FutureExt, TryStreamExt}; use sea_orm::{ActiveModelTrait, Set}; +use tokio::time::sleep; use crate::{ config::{Config, SourceConfig}, error::AppError, + jobs::{Job, Schedule}, AppState, }; pub async fn handle_upload_configuration( state: State, - mut form: Multipart, + form: Multipart, ) -> Result<(), AppError> { - let (name, config) = { - let mut name = None; - let mut config = None::; - while let Some(field) = form.next_field().await.unwrap() { - let key = field.name().unwrap().to_owned(); - let value = - String::from_utf8(field.bytes().await.unwrap().to_vec()).unwrap(); - match key.as_ref() { - "name" => name = Some(value), - "config" => config = Some(serde_yaml::from_str(&value).unwrap()), - _ => return Err(anyhow!("error").into()), - } - } - (name.unwrap(), config.unwrap()) - }; + let upload_form = extract_multipart(form).await.unwrap(); + let config_string = serde_json::to_string(&upload_form.config).unwrap(); + state + .tx + .send(Job { + code: async move { + spawn_docker(&upload_form.config).await.unwrap(); + println!("done with job"); + } + .boxed(), + schedule: Schedule::ASAP, + }) + .unwrap(); + println!("spawned job"); + + service::ActiveModel { + name: Set(upload_form.name), + config: Set(config_string), + overall_status: Set("New".into()), + ..Default::default() + } + .save(&state.conn) + .await + .unwrap(); + + Ok(()) +} + +struct UploadForm { + name: String, + config: Config, +} + +async fn extract_multipart(mut form: Multipart) -> Result { + let mut name = None; + let mut config = None::; + while let Some(field) = form.next_field().await.unwrap() { + let key = field.name().unwrap().to_owned(); + let value = + String::from_utf8(field.bytes().await.unwrap().to_vec()).unwrap(); + match key.as_ref() { + "name" => name = Some(value), + "config" => config = Some(serde_yaml::from_str(&value).unwrap()), + _ => return Err(anyhow!("error").into()), + } + } + + Ok(UploadForm { + name: name.unwrap(), + config: config.unwrap(), + }) +} + +async fn spawn_docker(config: &Config) -> Result<()> { println!("Loading config: {config:?}"); let docker = Docker::connect_with_local_defaults().unwrap(); + println!("connected to docker {docker:?}"); + sleep(Duration::from_secs(1)).await; - for (_service_name, service_config) in config.service.iter() { + for (service_name, service_config) in config.service.iter() { + println!("preparing config for {}", service_name); let SourceConfig::Image { image: from_image } = &service_config.source; - let result_stream = docker.create_image( + + println!("creating image {}", from_image); + let mut result_stream = docker.create_image( Some(CreateImageOptions { from_image: from_image.to_owned(), ..Default::default() @@ -51,14 +99,11 @@ pub async fn handle_upload_configuration( None, None, ); + println!("result stream: "); - result_stream - .try_for_each(|x| async move { - println!("{x:?}"); - Ok(()) - }) - .await - .unwrap(); + while let Ok(Some(v)) = result_stream.try_next().await { + println!("v: {v:?}"); + } println!("completed pulling."); // create a container @@ -83,14 +128,5 @@ pub async fn handle_upload_configuration( println!("result: {result:?}"); } - service::ActiveModel { - name: Set(name), - config: Set(serde_json::to_string(&config).unwrap()), - ..Default::default() - } - .save(&state.conn) - .await - .unwrap(); - Ok(()) }