From 642ff6616191be8ab451f67b63eac852f5dd1bed Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Mon, 20 Aug 2018 20:53:37 -0500 Subject: [PATCH] wip --- examples/github.rs | 27 +++-- src/config.rs | 3 +- src/handler.rs | 249 ++++++++++++++++++++++++++++++++++----------- src/hook.rs | 53 ++++++---- src/lib.rs | 4 +- src/service.rs | 64 +++++------- 6 files changed, 275 insertions(+), 125 deletions(-) diff --git a/examples/github.rs b/examples/github.rs index 2574d9f..0b933f0 100644 --- a/examples/github.rs +++ b/examples/github.rs @@ -60,20 +60,24 @@ fn default_path() -> PathBuf { PathBuf::from(".") } -fn main() -> Result<(), Error> { +fn main() { let args = Opt::from_args(); - let config: Config = serde_json::from_str(&args.config)?; + let config: Config = serde_json::from_str(&args.config).expect("Could not parse config."); println!("{:?}", config); let mut payload = String::new(); - io::stdin().read_to_string(&mut payload)?; - let payload: Payload = serde_json::from_str(&payload)?; + io::stdin() + .read_to_string(&mut payload) + .expect("Could not read from stdin"); + let payload: Payload = serde_json::from_str(&payload) + .expect(&format!("Could not parse stdin into json: '{}'", payload)); if !config.disable_hmac_verify { let secret = GenericArray::from_iter(config.secret.bytes()); let mut mac = Hmac::::new(&secret); mac.input(payload.body.as_bytes()); - let signature = mac.result() + let signature = mac + .result() .code() .into_iter() .map(|b| format!("{:02x}", b)) @@ -83,15 +87,18 @@ fn main() -> Result<(), Error> { let auth = payload .headers .get("x-hub-signature") - .ok_or(err_msg("Missing auth header"))?; + .ok_or(err_msg("Missing auth header")) + .expect("Missing auth header"); let left = SecStr::from(format!("sha1={}", signature)); let right = SecStr::from(auth.bytes().collect::>()); assert!(left == right, "HMAC signature didn't match",); } - let payload: GithubPayload = serde_json::from_str(&payload.body)?; - let mut target_path = PathBuf::from(env::var("DIP_WORKDIR")?); + let payload: GithubPayload = + serde_json::from_str(&payload.body).expect("Could not parse Github input into json"); + let mut target_path = + PathBuf::from(env::var("DIP_WORKDIR").expect("Could not determine working directory")); target_path.push(&config.path); Command::new("git") .arg("clone") @@ -100,6 +107,6 @@ fn main() -> Result<(), Error> { .arg("--depth") .arg("1") .arg(&target_path) - .output()?; - Ok(()) + .output() + .expect("Could not spawn process to clone"); } diff --git a/src/config.rs b/src/config.rs index fd8c5a5..1ff28a3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -65,7 +65,8 @@ where if !path.is_file() { continue; } - match path.file_name() + match path + .file_name() .and_then(|s| s.to_str()) .ok_or(err_msg("???")) .map(|s| { diff --git a/src/handler.rs b/src/handler.rs index bcd41be..76b4ce8 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,20 +1,26 @@ use std::io::Write; use std::path::PathBuf; -use std::process::{Command, Stdio}; +use std::process::{Command, Output, Stdio}; use failure::{err_msg, Error}; -use futures::{future, Future}; +use futures::{ + future::{self, FutureResult}, + Future, +}; use serde::Serialize; use serde_json::{Serializer as JsonSerializer, Value as JsonValue}; +use tokio_process::CommandExt; use toml::Value as TomlValue; use PROGRAMS; +#[derive(Clone, Debug)] pub struct Handler { - config: TomlValue, - action: Action, + pub config: TomlValue, + pub action: Action, } +#[derive(Clone, Debug)] pub enum Action { Command(String), Exec(PathBuf), @@ -48,8 +54,7 @@ impl Handler { value .canonicalize() .map_err(|_| err_msg("failed to canonicalize the path")) - }) - .map(|value| value.clone())?; + }).map(|value| value.clone())?; Action::Exec(program) } }; @@ -58,89 +63,217 @@ impl Handler { } pub fn run( - &self, + config: TomlValue, + action: Action, temp_path: PathBuf, input: JsonValue, ) -> impl Future { + println!("Running: {:?} :: {:?}", config, action); let config = { let mut buf: Vec = Vec::new(); { let mut serializer = JsonSerializer::new(&mut buf); - TomlValue::serialize(&self.config, &mut serializer); + TomlValue::serialize(&config, &mut serializer).unwrap(); } String::from_utf8(buf).unwrap() }; - let output = match &self.action { + let output: Box + Send> = match action { Action::Command(ref cmd) => { // TODO: allow some kind of simple variable replacement - let child = Command::new("/bin/bash") + let mut child = Command::new("/bin/bash"); + let child = child .current_dir(&temp_path) .env("DIP_ROOT", "lol") - .env("DIP_WORKDIR", temp_path) + .env("DIP_WORKDIR", &temp_path) .arg("-c") .arg(cmd) .stdin(Stdio::piped()) .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .unwrap(); - let output = child.wait_with_output().unwrap(); - if !output.status.success() { - // TODO: get rid of unwraps - return future::err(err_msg(format!( - "Command '{}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", - cmd, - output.status, - String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), - String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) - ))); - } - output + .stderr(Stdio::piped()); + let result = child + .output_async() + .map_err(|err| err_msg(format!("failed to spawn child: {}", err))) + .and_then(|output| { + let stdout = + String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); + let stderr = + String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); + future::ok(json!({ + "stdout": stdout, + "stderr": stderr, + })) + }); + Box::new(result) } Action::Exec(ref path) => { let mut child = Command::new(&path) .current_dir(&temp_path) .env("DIP_ROOT", "") - .env("DIP_WORKDIR", temp_path) + .env("DIP_WORKDIR", &temp_path) .arg("--config") .arg(config) - .stdin(Stdio::piped()) + .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) - .spawn() - .unwrap(); - { - match child.stdin { - Some(ref mut stdin) => { - write!(stdin, "{}", input); + .spawn_async() + .expect("could not spawn child"); + match child.stdin() { + Some(ref mut stdin) => { + println!("writing input: '{}'", input); + write!(stdin, "{}", input); + } + None => (), + }; + let result = child + .wait_with_output() + .and_then(|output| { + if output.status.success() { + future::ok(output) + } else { + // TODO: change this + future::ok(output) } - None => return future::err(err_msg("done fucked")), - }; - } - let output = child.wait_with_output().unwrap(); - if !output.status.success() { - // TODO: get rid of unwraps - return future::err(err_msg(format!( - "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", - path, - output.status, - String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), - String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) - ))); - } - output + }).map(|output| { + let stdout = + String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); + let stderr = + String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); + println!("stdout: {}, stderr: {}", stdout, stderr); + json!({ + "stdout": stdout, + "stderr": stderr, + }) + }).map_err(|err| err_msg(format!("could not get output: {}", err))); + Box::new(result) + // .and_then(move |output| { + // if output.status.success() { + // future::ok(output) + // } else { + // // TODO: get rid of unwraps + // future::err(err_msg(format!( + // "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", + // path, + // output.status, + // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), + // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) + // ))) + // } + // }).map(|output| { + // let stdout = + // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); + // let stderr = + // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); + // // future::ok(json!({ + // // "stdout": stdout, + // // "stderr": stderr, + // // })) + // json!("") + // }).map_err(|err| err_msg(format!("could not get output: {}", err))) + + // let result = child + // .spawn_async() + // .expect("could not spawn child") + // .map_err(|err| err_msg(format!("failed to get output: {}", err))) + // .and_then(|child| { + // match child.stdin() { + // Some(ref mut stdin) => { + // // future::result(write!(stdin, "{}", input)) + // future::ok(0) + // } + // None => future::err(err_msg("done fucked")), + // } + // }); + + // let output = child.wait_with_output().unwrap(); + // if !output.status.success() { + // // TODO: get rid of unwraps + // return future::err(err_msg(format!( + // "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", + // path, + // output.status, + // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), + // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) + // ))); + // } + // output } }; + output.map(|x| (temp_path, x)) - let stdout = String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); - let stderr = String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); - future::ok(( - temp_path, - json!({ - "stdout": stdout, - "stderr": stderr, - }), - )) + // let result = match action { + // Action::Command(ref cmd) => { + // // TODO: allow some kind of simple variable replacement + // let child = Command::new("/bin/bash") + // .current_dir(&temp_path) + // .env("DIP_ROOT", "lol") + // .env("DIP_WORKDIR", &temp_path) + // .arg("-c") + // .arg(cmd) + // .stdin(Stdio::piped()) + // .stdout(Stdio::piped()) + // .stderr(Stdio::piped()); + // child.output_async().map_err(|err| err_msg(format!("failed to get output: {}", err))).and_then(|output| { + // if output.status.success() { + // future::ok(output) + // } else { + // // TODO: get rid of unwraps + // future::err(err_msg(format!( + // "Command '{}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", + // cmd, + // output.status, + // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), + // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) + // ))) + // } + // }) + // } + // Action::Exec(ref path) => { + // let mut child = Command::new(&path) + // .current_dir(&temp_path) + // .env("DIP_ROOT", "") + // .env("DIP_WORKDIR", &temp_path) + // .arg("--config") + // .arg(config) + // .stdin(Stdio::piped()) + // .stdout(Stdio::piped()) + // .stderr(Stdio::piped()); + // child + // .spawn_async() + // .map_err(|err| err_msg(format!("failed to get output: {}", err))).and_then(|child| { + // future::ok() + // }) + // // { + // // match child.stdin { + // // Some(ref mut stdin) => { + // // write!(stdin, "{}", input); + // // } + // // None => return future::err(err_msg("done fucked")), + // // }; + // // } + // // let output = child.wait_with_output().unwrap(); + // // if !output.status.success() { + // // // TODO: get rid of unwraps + // // return future::err(err_msg(format!( + // // "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", + // // path, + // // output.status, + // // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), + // // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) + // // ))); + // // } + // // output + // } + // }; + + // let stdout = String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); + // let stderr = String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); + // future::ok(( + // temp_path, + // json!({ + // "stdout": stdout, + // "stderr": stderr, + // }), + // )) } } diff --git a/src/hook.rs b/src/hook.rs index 4158349..7cd897a 100644 --- a/src/hook.rs +++ b/src/hook.rs @@ -6,36 +6,37 @@ use std::sync::Arc; use failure::{err_msg, Error}; use futures::{future, stream, Future}; -use owning_ref::BoxRef; use serde_json::Value as JsonValue; use tokio::{self, prelude::*}; use toml::Value; use Handler; +use HANDLERS; pub struct Hook { name: String, - handlers: Arc>, + handlers: Vec, } impl Hook { pub fn from(name: impl Into, config: &Value) -> Result { let name = name.into(); - let handlers = Arc::new(config + let handlers = config .get("handlers") .ok_or(err_msg("No 'handlers' found."))? .as_array() .ok_or(err_msg("'handlers' is not an array."))? .iter() .map(|value: &Value| Handler::from(value)) - .collect::, _>>()?); + .collect::, _>>()?; Ok(Hook { name, handlers }) } pub fn from_file

(path: P) -> Result where P: AsRef, { - let filename = path.as_ref() + let filename = path + .as_ref() .file_name() .ok_or(err_msg("what the fuck bro"))? .to_str() @@ -53,18 +54,35 @@ impl Hook { self.name.clone() } pub fn handle(&self, req: JsonValue, temp_path: PathBuf) -> Result { - let h = self.handlers.clone(); - let it = h.iter(); - let s = stream::iter_ok(it).fold((temp_path, req), move |prev, handler| { - let (path, prev) = prev; - handler.run(path, prev) - }); - /*.fold(future::ok(req), |prev, handler| { - prev.and_then(|val| handler.run(temp_path, val)) - });*/ - let s = s.map(|_| ()).map_err(|_: Error| ()); - tokio::executor::spawn(s); + let handlers = self + .handlers + .iter() + .map(|handler| (handler.config.clone(), handler.action.clone())) + .collect::>(); + let st = stream::iter_ok::<_, Error>(handlers.into_iter()) + .fold((temp_path, req), |(path, prev), (config, action)| { + println!("executing in {:?}", &path); + Handler::run(config, action, path, prev) + }).map(|_| ()) + .map_err(|_: Error| ()); + tokio::executor::spawn(st); + // let it = stream::iter_ok::<_, Error>(handlers.iter()); + // let s = it.fold((temp_path, req), move |(path, prev), handler| { + // let result = handler.run(path.clone(), prev.clone()); + // result + // }).map(|_|()).map_err(|_| ()); + // tokio::executor::spawn(s); Ok("success".to_owned()) + // let s = stream::iter_ok(self.handlers.iter()).fold((temp_path, req), move |prev, handler| { + // let (path, prev) = prev; + // handler.run(path, prev) + // }); + // .fold(future::ok(req), |prev, handler| { + // prev.and_then(|val| handler.run(temp_path, val)) + // }); + // let s = s.map(|_| ()).map_err(|_: Error| ()); + // tokio::executor::spawn(s); + // Ok("success".to_owned()) /* Ok(self.iter() .fold(Ok(req), |prev, handler| { @@ -85,6 +103,5 @@ impl Hook { ) }) .unwrap_or_else(|err| (StatusCode::BAD_REQUEST, format!("Error: {:?}", err)))) - */ - } + */ } } diff --git a/src/lib.rs b/src/lib.rs index c60aa99..ff1e795 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ extern crate mktemp; extern crate owning_ref; extern crate serde; extern crate tokio; +extern crate tokio_process; #[macro_use] extern crate serde_json; #[macro_use] @@ -48,7 +49,8 @@ const URIPATTERN_STR: &str = r"/webhook/(?P[A-Za-z._][A-Za-z0-9._]*)"; lazy_static! { static ref URIPATTERN: Regex = Regex::new(URIPATTERN_STR).unwrap(); - // static ref HANDLERS: Mutex>> = Mutex::new(HashMap::new()); + static ref HANDLERS: Arc>> = + Arc::new(Mutex::new(HashMap::new())); static ref PROGRAMS: Mutex> = Mutex::new(HashMap::new()); static ref HOOKS: Arc>> = Arc::new(Mutex::new(HashMap::new())); } diff --git a/src/service.rs b/src/service.rs index 9e60c8e..9ec7803 100644 --- a/src/service.rs +++ b/src/service.rs @@ -34,51 +34,41 @@ pub fn dip_service(req: Request) -> Box, Erro // TODO: filter by method as well - let headers = req.headers() + let headers = req + .headers() .clone() .into_iter() .filter_map(|(k, v)| { let key = k.unwrap().as_str().to_owned(); v.to_str().map(|value| (key, value.to_owned())).ok() - }) - .collect::>(); + }).collect::>(); let method = req.method().as_str().to_owned(); // spawn job - thread::spawn(move || { - req.into_body().concat2().map(move |body| { - let body = String::from_utf8(body.to_vec()).unwrap(); - let req_obj = json!({ - "body": body, - "headers": headers, - "method": method, - }); - let hooks = HOOKS.lock().unwrap(); - { - let mut temp_dir = Temp::new_dir().unwrap(); - let temp_path = temp_dir.to_path_buf(); - assert!(temp_path.exists()); + Box::new(req.into_body().concat2().map(move |body| { + let body = String::from_utf8(body.to_vec()).unwrap(); + let req_obj = json!({ + "body": body, + "headers": headers, + "method": method, + }); + let hooks = HOOKS.lock().unwrap(); + { + let mut temp_dir = Temp::new_dir().unwrap(); + let temp_path = temp_dir.to_path_buf(); + assert!(temp_path.exists()); - let hook = hooks.get(&name).unwrap(); - let (code, msg) = match hook.handle(req_obj, temp_path) { - Ok(msg) => (StatusCode::ACCEPTED, msg), - Err(msg) => (StatusCode::BAD_REQUEST, msg), - }; + let hook = hooks.get(&name).unwrap(); + let (code, msg) = match hook.handle(req_obj, temp_path) { + Ok(msg) => (StatusCode::ACCEPTED, msg), + Err(msg) => (StatusCode::BAD_REQUEST, msg), + }; - temp_dir.release(); - Response::builder() - .status(code) - .body(Body::from(msg)) - .unwrap_or_else(|err| Response::new(Body::from(format!("{}", err)))) - } - }) - }); - - Box::new(future::ok( - Response::builder() - .status(StatusCode::ACCEPTED) - // TODO: assign job a uuid and do some logging - .body(Body::from(format!("job {} started", -1))) - .unwrap(), - )) + temp_dir.release(); + Response::builder() + .status(code) + .body(Body::from(msg)) + .unwrap_or_else(|err| Response::new(Body::from(format!("{}", err)))) + } + })) }