From cc2bb42c113aa0fca8dcc271606d4f0f99e0255d Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Mon, 7 Aug 2023 03:00:32 -0500 Subject: [PATCH] some refactor --- src/auth.rs | 52 +++++++++++++++ src/config.rs | 2 +- src/jobs.rs | 23 +++++++ src/lib.rs | 15 +++++ src/main.rs | 175 +++----------------------------------------------- src/upload.rs | 96 +++++++++++++++++++++++++++ 6 files changed, 195 insertions(+), 168 deletions(-) create mode 100644 src/auth.rs create mode 100644 src/jobs.rs create mode 100644 src/lib.rs create mode 100644 src/upload.rs diff --git a/src/auth.rs b/src/auth.rs new file mode 100644 index 0000000..a08570c --- /dev/null +++ b/src/auth.rs @@ -0,0 +1,52 @@ +use axum::{ + http::{ + header::{AUTHORIZATION, WWW_AUTHENTICATE}, + Request, + }, + middleware::Next, + response::Response, +}; +use base64::{prelude::BASE64_STANDARD, Engine}; + +pub async fn http_basic_auth( + request: Request, + next: Next, +) -> Response { + // do something with `request`... + let headers = request.headers(); + let auth_fail_response = Response::builder() + .status(401) + .header(WWW_AUTHENTICATE, "Basic") + .body(axum::body::boxed(String::from(""))) + .unwrap(); + + let authorization = match headers.get(AUTHORIZATION) { + Some(v) => v.to_str().unwrap(), + None => return auth_fail_response, + }; + let parts = authorization.split_ascii_whitespace().collect::>(); + let base64part = match &parts[..] { + ["Basic", base64part] => *base64part, + _ => return auth_fail_response, + }; + let decoded = match BASE64_STANDARD + .decode(base64part.as_bytes()) + .map_err(|e| anyhow::Error::from(e)) + .and_then(|e| String::from_utf8(e).map_err(|e| e.into())) + { + Ok(v) => v, + Err(_) => return auth_fail_response, + }; + let parts = decoded.split(":").collect::>(); + let (user, pass) = match &parts[..] { + [user, pass] => (*user, *pass), + _ => return auth_fail_response, + }; + if !(user == "root" && pass == "hellosu") { + return auth_fail_response; + } + + let response = next.run(request).await; + + response +} diff --git a/src/config.rs b/src/config.rs index ee03f21..7c036cb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, hash::Hash}; +use std::{collections::HashMap}; #[derive(Debug, Serialize, Deserialize)] pub struct Config { diff --git a/src/jobs.rs b/src/jobs.rs new file mode 100644 index 0000000..2ecec0c --- /dev/null +++ b/src/jobs.rs @@ -0,0 +1,23 @@ +use futures::Future; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; + +pub type Job = Box>; +pub type JobSender = UnboundedSender; + +pub struct Scheduler { + rx: UnboundedReceiver, +} + +impl Scheduler { + pub fn new() -> (Self, JobSender) { + let (tx, rx) = mpsc::unbounded_channel(); + let scheduler = Scheduler { rx }; + (scheduler, tx) + } + + pub async fn run(self) { + loop { + // Get the next job, or if a new thing comes in + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..8412cd2 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,15 @@ +#[macro_use] +extern crate serde; + +use sea_orm::DatabaseConnection; + +pub mod auth; +pub mod config; +pub mod error; +pub mod jobs; +pub mod upload; + +#[derive(Clone)] +pub struct AppState { + pub conn: DatabaseConnection, +} diff --git a/src/main.rs b/src/main.rs index e33ab0d..ad27ad4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,38 +1,15 @@ -#[macro_use] -extern crate serde; - -mod config; -mod error; - -use anyhow::{anyhow, Result}; +use aah::{ + auth::http_basic_auth, upload::handle_upload_configuration, AppState, +}; +use anyhow::Result; use axum::{ - extract::{Multipart, State}, - http::{ - header::{AUTHORIZATION, WWW_AUTHENTICATE}, - HeaderName, Request, - }, - middleware::{from_fn, Next}, - response::{Html, Response}, + middleware::from_fn, + response::Html, routing::{get, post}, - Form, Json, Router, + Router, }; -use axum_auth::AuthBasic; -use base64::{prelude::BASE64_STANDARD, Engine}; -use bollard::{ - auth, - container::{ - Config as ContainerConfig, CreateContainerOptions, StartContainerOptions, - }, - image::CreateImageOptions, - Docker, -}; -use entity::service; -use error::AppError; -use futures::{StreamExt, TryStreamExt}; -use migration::{Migrator, MigratorTrait}; -use sea_orm::{ActiveModelTrait, DatabaseConnection, Set}; -use crate::config::{Config, SourceConfig}; +use migration::{Migrator, MigratorTrait}; #[tokio::main] async fn main() -> Result<()> { @@ -59,139 +36,3 @@ async fn main() -> Result<()> { webserver_handle.await??; Ok(()) } - -async fn http_basic_auth(request: Request, next: Next) -> Response { - // do something with `request`... - let headers = request.headers(); - let auth_fail_response = Response::builder() - .status(401) - .header(WWW_AUTHENTICATE, "Basic") - .body(axum::body::boxed(String::from(""))) - .unwrap(); - - let authorization = match headers.get(AUTHORIZATION) { - Some(v) => v.to_str().unwrap(), - None => return auth_fail_response, - }; - let parts = authorization.split_ascii_whitespace().collect::>(); - let base64part = match &parts[..] { - ["Basic", base64part] => *base64part, - _ => return auth_fail_response, - }; - let decoded = match BASE64_STANDARD - .decode(base64part.as_bytes()) - .map_err(|e| anyhow::Error::from(e)) - .and_then(|e| String::from_utf8(e).map_err(|e| e.into())) - { - Ok(v) => v, - Err(_) => return auth_fail_response, - }; - let parts = decoded.split(":").collect::>(); - let (user, pass) = match &parts[..] { - [user, pass] => (*user, *pass), - _ => return auth_fail_response, - }; - if !(user == "root" && pass == "hellosu") { - return auth_fail_response; - } - - let response = next.run(request).await; - - response -} - -#[derive(Clone)] -struct AppState { - conn: DatabaseConnection, -} - -#[derive(Deserialize)] -struct UploadForm { - name: String, - config: String, -} - -async fn handle_upload_configuration( - AuthBasic((id, password)): AuthBasic, - state: State, - mut form: Multipart, - // Form(upload_form): Form, -) -> Result<(), AppError> { - if id != "root" || password.as_ref().unwrap() != "hellosu" { - return Err(anyhow!("error").into()); - } - - let (name, config) = { - let mut name = None; - let mut config = None::; - while let Some(field) = form.next_field().await.unwrap() { - let key = field.name().unwrap().to_owned(); - let value = - String::from_utf8(field.bytes().await.unwrap().to_vec()).unwrap(); - match key.as_ref() { - "name" => name = Some(value), - "config" => config = Some(serde_yaml::from_str(&value).unwrap()), - _ => return Err(anyhow!("error").into()), - } - } - (name.unwrap(), config.unwrap()) - }; - - println!("Loading config: {config:?}"); - - let docker = Docker::connect_with_local_defaults().unwrap(); - - for (service_name, service_config) in config.service.iter() { - let SourceConfig::Image { image: from_image } = &service_config.source; - let result_stream = docker.create_image( - Some(CreateImageOptions { - from_image: from_image.to_owned(), - ..Default::default() - }), - None, - None, - ); - - result_stream - .try_for_each(|x| async move { - println!("{x:?}"); - Ok(()) - }) - .await - .unwrap(); - println!("completed pulling."); - - // create a container - let result = docker - .create_container( - Some(CreateContainerOptions:: { - ..Default::default() - }), - ContainerConfig { - image: Some("hello-world"), - cmd: Some(vec!["/hello"]), - ..Default::default() - }, - ) - .await - .unwrap(); - - docker - .start_container(&result.id, None::>) - .await - .unwrap(); - - println!("result: {result:?}"); - } - - service::ActiveModel { - name: Set(name), - config: Set(serde_json::to_string(&config).unwrap()), - ..Default::default() - } - .save(&state.conn) - .await - .unwrap(); - - Ok(()) -} diff --git a/src/upload.rs b/src/upload.rs new file mode 100644 index 0000000..b050b76 --- /dev/null +++ b/src/upload.rs @@ -0,0 +1,96 @@ +use anyhow::anyhow; +use axum::extract::{Multipart, State}; +use bollard::{ + container::{ + Config as ContainerConfig, CreateContainerOptions, StartContainerOptions, + }, + image::CreateImageOptions, + Docker, +}; +use entity::service; +use futures::TryStreamExt; +use sea_orm::{ActiveModelTrait, Set}; + +use crate::{ + config::{Config, SourceConfig}, + error::AppError, + AppState, +}; + +pub async fn handle_upload_configuration( + state: State, + mut form: Multipart, +) -> Result<(), AppError> { + let (name, config) = { + let mut name = None; + let mut config = None::; + while let Some(field) = form.next_field().await.unwrap() { + let key = field.name().unwrap().to_owned(); + let value = + String::from_utf8(field.bytes().await.unwrap().to_vec()).unwrap(); + match key.as_ref() { + "name" => name = Some(value), + "config" => config = Some(serde_yaml::from_str(&value).unwrap()), + _ => return Err(anyhow!("error").into()), + } + } + (name.unwrap(), config.unwrap()) + }; + + println!("Loading config: {config:?}"); + + let docker = Docker::connect_with_local_defaults().unwrap(); + + for (_service_name, service_config) in config.service.iter() { + let SourceConfig::Image { image: from_image } = &service_config.source; + let result_stream = docker.create_image( + Some(CreateImageOptions { + from_image: from_image.to_owned(), + ..Default::default() + }), + None, + None, + ); + + result_stream + .try_for_each(|x| async move { + println!("{x:?}"); + Ok(()) + }) + .await + .unwrap(); + println!("completed pulling."); + + // create a container + let result = docker + .create_container( + Some(CreateContainerOptions:: { + ..Default::default() + }), + ContainerConfig { + image: Some(from_image.to_owned()), + ..Default::default() + }, + ) + .await + .unwrap(); + + docker + .start_container(&result.id, None::>) + .await + .unwrap(); + + println!("result: {result:?}"); + } + + service::ActiveModel { + name: Set(name), + config: Set(serde_json::to_string(&config).unwrap()), + ..Default::default() + } + .save(&state.conn) + .await + .unwrap(); + + Ok(()) +}