This commit is contained in:
Michael Zhang 2018-08-20 20:53:37 -05:00
parent 5ad14efdaf
commit 642ff66161
No known key found for this signature in database
GPG key ID: A1B65B603268116B
6 changed files with 275 additions and 125 deletions

View file

@ -60,20 +60,24 @@ fn default_path() -> PathBuf {
PathBuf::from(".") PathBuf::from(".")
} }
fn main() -> Result<(), Error> { fn main() {
let args = Opt::from_args(); let args = Opt::from_args();
let config: Config = serde_json::from_str(&args.config)?; let config: Config = serde_json::from_str(&args.config).expect("Could not parse config.");
println!("{:?}", config); println!("{:?}", config);
let mut payload = String::new(); let mut payload = String::new();
io::stdin().read_to_string(&mut payload)?; io::stdin()
let payload: Payload = serde_json::from_str(&payload)?; .read_to_string(&mut payload)
.expect("Could not read from stdin");
let payload: Payload = serde_json::from_str(&payload)
.expect(&format!("Could not parse stdin into json: '{}'", payload));
if !config.disable_hmac_verify { if !config.disable_hmac_verify {
let secret = GenericArray::from_iter(config.secret.bytes()); let secret = GenericArray::from_iter(config.secret.bytes());
let mut mac = Hmac::<Sha1>::new(&secret); let mut mac = Hmac::<Sha1>::new(&secret);
mac.input(payload.body.as_bytes()); mac.input(payload.body.as_bytes());
let signature = mac.result() let signature = mac
.result()
.code() .code()
.into_iter() .into_iter()
.map(|b| format!("{:02x}", b)) .map(|b| format!("{:02x}", b))
@ -83,15 +87,18 @@ fn main() -> Result<(), Error> {
let auth = payload let auth = payload
.headers .headers
.get("x-hub-signature") .get("x-hub-signature")
.ok_or(err_msg("Missing auth header"))?; .ok_or(err_msg("Missing auth header"))
.expect("Missing auth header");
let left = SecStr::from(format!("sha1={}", signature)); let left = SecStr::from(format!("sha1={}", signature));
let right = SecStr::from(auth.bytes().collect::<Vec<_>>()); let right = SecStr::from(auth.bytes().collect::<Vec<_>>());
assert!(left == right, "HMAC signature didn't match",); assert!(left == right, "HMAC signature didn't match",);
} }
let payload: GithubPayload = serde_json::from_str(&payload.body)?; let payload: GithubPayload =
let mut target_path = PathBuf::from(env::var("DIP_WORKDIR")?); serde_json::from_str(&payload.body).expect("Could not parse Github input into json");
let mut target_path =
PathBuf::from(env::var("DIP_WORKDIR").expect("Could not determine working directory"));
target_path.push(&config.path); target_path.push(&config.path);
Command::new("git") Command::new("git")
.arg("clone") .arg("clone")
@ -100,6 +107,6 @@ fn main() -> Result<(), Error> {
.arg("--depth") .arg("--depth")
.arg("1") .arg("1")
.arg(&target_path) .arg(&target_path)
.output()?; .output()
Ok(()) .expect("Could not spawn process to clone");
} }

View file

@ -65,7 +65,8 @@ where
if !path.is_file() { if !path.is_file() {
continue; continue;
} }
match path.file_name() match path
.file_name()
.and_then(|s| s.to_str()) .and_then(|s| s.to_str())
.ok_or(err_msg("???")) .ok_or(err_msg("???"))
.map(|s| { .map(|s| {

View file

@ -1,20 +1,26 @@
use std::io::Write; use std::io::Write;
use std::path::PathBuf; use std::path::PathBuf;
use std::process::{Command, Stdio}; use std::process::{Command, Output, Stdio};
use failure::{err_msg, Error}; use failure::{err_msg, Error};
use futures::{future, Future}; use futures::{
future::{self, FutureResult},
Future,
};
use serde::Serialize; use serde::Serialize;
use serde_json::{Serializer as JsonSerializer, Value as JsonValue}; use serde_json::{Serializer as JsonSerializer, Value as JsonValue};
use tokio_process::CommandExt;
use toml::Value as TomlValue; use toml::Value as TomlValue;
use PROGRAMS; use PROGRAMS;
#[derive(Clone, Debug)]
pub struct Handler { pub struct Handler {
config: TomlValue, pub config: TomlValue,
action: Action, pub action: Action,
} }
#[derive(Clone, Debug)]
pub enum Action { pub enum Action {
Command(String), Command(String),
Exec(PathBuf), Exec(PathBuf),
@ -48,8 +54,7 @@ impl Handler {
value value
.canonicalize() .canonicalize()
.map_err(|_| err_msg("failed to canonicalize the path")) .map_err(|_| err_msg("failed to canonicalize the path"))
}) }).map(|value| value.clone())?;
.map(|value| value.clone())?;
Action::Exec(program) Action::Exec(program)
} }
}; };
@ -58,89 +63,217 @@ impl Handler {
} }
pub fn run( pub fn run(
&self, config: TomlValue,
action: Action,
temp_path: PathBuf, temp_path: PathBuf,
input: JsonValue, input: JsonValue,
) -> impl Future<Item = (PathBuf, JsonValue), Error = Error> { ) -> impl Future<Item = (PathBuf, JsonValue), Error = Error> {
println!("Running: {:?} :: {:?}", config, action);
let config = { let config = {
let mut buf: Vec<u8> = Vec::new(); let mut buf: Vec<u8> = Vec::new();
{ {
let mut serializer = JsonSerializer::new(&mut buf); let mut serializer = JsonSerializer::new(&mut buf);
TomlValue::serialize(&self.config, &mut serializer); TomlValue::serialize(&config, &mut serializer).unwrap();
} }
String::from_utf8(buf).unwrap() String::from_utf8(buf).unwrap()
}; };
let output = match &self.action { let output: Box<Future<Item = JsonValue, Error = Error> + Send> = match action {
Action::Command(ref cmd) => { Action::Command(ref cmd) => {
// TODO: allow some kind of simple variable replacement // TODO: allow some kind of simple variable replacement
let child = Command::new("/bin/bash") let mut child = Command::new("/bin/bash");
let child = child
.current_dir(&temp_path) .current_dir(&temp_path)
.env("DIP_ROOT", "lol") .env("DIP_ROOT", "lol")
.env("DIP_WORKDIR", temp_path) .env("DIP_WORKDIR", &temp_path)
.arg("-c") .arg("-c")
.arg(cmd) .arg(cmd)
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped()) .stderr(Stdio::piped());
.spawn() let result = child
.unwrap(); .output_async()
let output = child.wait_with_output().unwrap(); .map_err(|err| err_msg(format!("failed to spawn child: {}", err)))
if !output.status.success() { .and_then(|output| {
// TODO: get rid of unwraps let stdout =
return future::err(err_msg(format!( String::from_utf8(output.stdout).unwrap_or_else(|_| String::new());
"Command '{}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", let stderr =
cmd, String::from_utf8(output.stderr).unwrap_or_else(|_| String::new());
output.status, future::ok(json!({
String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), "stdout": stdout,
String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) "stderr": stderr,
))); }))
} });
output Box::new(result)
} }
Action::Exec(ref path) => { Action::Exec(ref path) => {
let mut child = Command::new(&path) let mut child = Command::new(&path)
.current_dir(&temp_path) .current_dir(&temp_path)
.env("DIP_ROOT", "") .env("DIP_ROOT", "")
.env("DIP_WORKDIR", temp_path) .env("DIP_WORKDIR", &temp_path)
.arg("--config") .arg("--config")
.arg(config) .arg(config)
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.stderr(Stdio::piped()) .stderr(Stdio::piped())
.spawn() .spawn_async()
.unwrap(); .expect("could not spawn child");
{ match child.stdin() {
match child.stdin { Some(ref mut stdin) => {
Some(ref mut stdin) => { println!("writing input: '{}'", input);
write!(stdin, "{}", input); write!(stdin, "{}", input);
}
None => (),
};
let result = child
.wait_with_output()
.and_then(|output| {
if output.status.success() {
future::ok(output)
} else {
// TODO: change this
future::ok(output)
} }
None => return future::err(err_msg("done fucked")), }).map(|output| {
}; let stdout =
} String::from_utf8(output.stdout).unwrap_or_else(|_| String::new());
let output = child.wait_with_output().unwrap(); let stderr =
if !output.status.success() { String::from_utf8(output.stderr).unwrap_or_else(|_| String::new());
// TODO: get rid of unwraps println!("stdout: {}, stderr: {}", stdout, stderr);
return future::err(err_msg(format!( json!({
"'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", "stdout": stdout,
path, "stderr": stderr,
output.status, })
String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), }).map_err(|err| err_msg(format!("could not get output: {}", err)));
String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) Box::new(result)
))); // .and_then(move |output| {
} // if output.status.success() {
output // future::ok(output)
// } else {
// // TODO: get rid of unwraps
// future::err(err_msg(format!(
// "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}",
// path,
// output.status,
// String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()),
// String::from_utf8(output.stderr).unwrap_or_else(|_| String::new())
// )))
// }
// }).map(|output| {
// let stdout =
// String::from_utf8(output.stdout).unwrap_or_else(|_| String::new());
// let stderr =
// String::from_utf8(output.stderr).unwrap_or_else(|_| String::new());
// // future::ok(json!({
// // "stdout": stdout,
// // "stderr": stderr,
// // }))
// json!("")
// }).map_err(|err| err_msg(format!("could not get output: {}", err)))
// let result = child
// .spawn_async()
// .expect("could not spawn child")
// .map_err(|err| err_msg(format!("failed to get output: {}", err)))
// .and_then(|child| {
// match child.stdin() {
// Some(ref mut stdin) => {
// // future::result(write!(stdin, "{}", input))
// future::ok(0)
// }
// None => future::err(err_msg("done fucked")),
// }
// });
// let output = child.wait_with_output().unwrap();
// if !output.status.success() {
// // TODO: get rid of unwraps
// return future::err(err_msg(format!(
// "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}",
// path,
// output.status,
// String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()),
// String::from_utf8(output.stderr).unwrap_or_else(|_| String::new())
// )));
// }
// output
} }
}; };
output.map(|x| (temp_path, x))
let stdout = String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); // let result = match action {
let stderr = String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); // Action::Command(ref cmd) => {
future::ok(( // // TODO: allow some kind of simple variable replacement
temp_path, // let child = Command::new("/bin/bash")
json!({ // .current_dir(&temp_path)
"stdout": stdout, // .env("DIP_ROOT", "lol")
"stderr": stderr, // .env("DIP_WORKDIR", &temp_path)
}), // .arg("-c")
)) // .arg(cmd)
// .stdin(Stdio::piped())
// .stdout(Stdio::piped())
// .stderr(Stdio::piped());
// child.output_async().map_err(|err| err_msg(format!("failed to get output: {}", err))).and_then(|output| {
// if output.status.success() {
// future::ok(output)
// } else {
// // TODO: get rid of unwraps
// future::err(err_msg(format!(
// "Command '{}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}",
// cmd,
// output.status,
// String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()),
// String::from_utf8(output.stderr).unwrap_or_else(|_| String::new())
// )))
// }
// })
// }
// Action::Exec(ref path) => {
// let mut child = Command::new(&path)
// .current_dir(&temp_path)
// .env("DIP_ROOT", "")
// .env("DIP_WORKDIR", &temp_path)
// .arg("--config")
// .arg(config)
// .stdin(Stdio::piped())
// .stdout(Stdio::piped())
// .stderr(Stdio::piped());
// child
// .spawn_async()
// .map_err(|err| err_msg(format!("failed to get output: {}", err))).and_then(|child| {
// future::ok()
// })
// // {
// // match child.stdin {
// // Some(ref mut stdin) => {
// // write!(stdin, "{}", input);
// // }
// // None => return future::err(err_msg("done fucked")),
// // };
// // }
// // let output = child.wait_with_output().unwrap();
// // if !output.status.success() {
// // // TODO: get rid of unwraps
// // return future::err(err_msg(format!(
// // "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}",
// // path,
// // output.status,
// // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()),
// // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new())
// // )));
// // }
// // output
// }
// };
// let stdout = String::from_utf8(output.stdout).unwrap_or_else(|_| String::new());
// let stderr = String::from_utf8(output.stderr).unwrap_or_else(|_| String::new());
// future::ok((
// temp_path,
// json!({
// "stdout": stdout,
// "stderr": stderr,
// }),
// ))
} }
} }

View file

@ -6,36 +6,37 @@ use std::sync::Arc;
use failure::{err_msg, Error}; use failure::{err_msg, Error};
use futures::{future, stream, Future}; use futures::{future, stream, Future};
use owning_ref::BoxRef;
use serde_json::Value as JsonValue; use serde_json::Value as JsonValue;
use tokio::{self, prelude::*}; use tokio::{self, prelude::*};
use toml::Value; use toml::Value;
use Handler; use Handler;
use HANDLERS;
pub struct Hook { pub struct Hook {
name: String, name: String,
handlers: Arc<Vec<Handler>>, handlers: Vec<Handler>,
} }
impl Hook { impl Hook {
pub fn from(name: impl Into<String>, config: &Value) -> Result<Self, Error> { pub fn from(name: impl Into<String>, config: &Value) -> Result<Self, Error> {
let name = name.into(); let name = name.into();
let handlers = Arc::new(config let handlers = config
.get("handlers") .get("handlers")
.ok_or(err_msg("No 'handlers' found."))? .ok_or(err_msg("No 'handlers' found."))?
.as_array() .as_array()
.ok_or(err_msg("'handlers' is not an array."))? .ok_or(err_msg("'handlers' is not an array."))?
.iter() .iter()
.map(|value: &Value| Handler::from(value)) .map(|value: &Value| Handler::from(value))
.collect::<Result<Vec<_>, _>>()?); .collect::<Result<Vec<_>, _>>()?;
Ok(Hook { name, handlers }) Ok(Hook { name, handlers })
} }
pub fn from_file<P>(path: P) -> Result<Hook, Error> pub fn from_file<P>(path: P) -> Result<Hook, Error>
where where
P: AsRef<Path>, P: AsRef<Path>,
{ {
let filename = path.as_ref() let filename = path
.as_ref()
.file_name() .file_name()
.ok_or(err_msg("what the fuck bro"))? .ok_or(err_msg("what the fuck bro"))?
.to_str() .to_str()
@ -53,18 +54,35 @@ impl Hook {
self.name.clone() self.name.clone()
} }
pub fn handle(&self, req: JsonValue, temp_path: PathBuf) -> Result<String, String> { pub fn handle(&self, req: JsonValue, temp_path: PathBuf) -> Result<String, String> {
let h = self.handlers.clone(); let handlers = self
let it = h.iter(); .handlers
let s = stream::iter_ok(it).fold((temp_path, req), move |prev, handler| { .iter()
let (path, prev) = prev; .map(|handler| (handler.config.clone(), handler.action.clone()))
handler.run(path, prev) .collect::<Vec<_>>();
}); let st = stream::iter_ok::<_, Error>(handlers.into_iter())
/*.fold(future::ok(req), |prev, handler| { .fold((temp_path, req), |(path, prev), (config, action)| {
prev.and_then(|val| handler.run(temp_path, val)) println!("executing in {:?}", &path);
});*/ Handler::run(config, action, path, prev)
let s = s.map(|_| ()).map_err(|_: Error| ()); }).map(|_| ())
tokio::executor::spawn(s); .map_err(|_: Error| ());
tokio::executor::spawn(st);
// let it = stream::iter_ok::<_, Error>(handlers.iter());
// let s = it.fold((temp_path, req), move |(path, prev), handler| {
// let result = handler.run(path.clone(), prev.clone());
// result
// }).map(|_|()).map_err(|_| ());
// tokio::executor::spawn(s);
Ok("success".to_owned()) Ok("success".to_owned())
// let s = stream::iter_ok(self.handlers.iter()).fold((temp_path, req), move |prev, handler| {
// let (path, prev) = prev;
// handler.run(path, prev)
// });
// .fold(future::ok(req), |prev, handler| {
// prev.and_then(|val| handler.run(temp_path, val))
// });
// let s = s.map(|_| ()).map_err(|_: Error| ());
// tokio::executor::spawn(s);
// Ok("success".to_owned())
/* /*
Ok(self.iter() Ok(self.iter()
.fold(Ok(req), |prev, handler| { .fold(Ok(req), |prev, handler| {
@ -85,6 +103,5 @@ impl Hook {
) )
}) })
.unwrap_or_else(|err| (StatusCode::BAD_REQUEST, format!("Error: {:?}", err)))) .unwrap_or_else(|err| (StatusCode::BAD_REQUEST, format!("Error: {:?}", err))))
*/ */ }
}
} }

View file

@ -8,6 +8,7 @@ extern crate mktemp;
extern crate owning_ref; extern crate owning_ref;
extern crate serde; extern crate serde;
extern crate tokio; extern crate tokio;
extern crate tokio_process;
#[macro_use] #[macro_use]
extern crate serde_json; extern crate serde_json;
#[macro_use] #[macro_use]
@ -48,7 +49,8 @@ const URIPATTERN_STR: &str = r"/webhook/(?P<name>[A-Za-z._][A-Za-z0-9._]*)";
lazy_static! { lazy_static! {
static ref URIPATTERN: Regex = Regex::new(URIPATTERN_STR).unwrap(); static ref URIPATTERN: Regex = Regex::new(URIPATTERN_STR).unwrap();
// static ref HANDLERS: Mutex<HashMap<String, Box<Handler>>> = Mutex::new(HashMap::new()); static ref HANDLERS: Arc<Mutex<HashMap<String, Handler>>> =
Arc::new(Mutex::new(HashMap::new()));
static ref PROGRAMS: Mutex<HashMap<String, PathBuf>> = Mutex::new(HashMap::new()); static ref PROGRAMS: Mutex<HashMap<String, PathBuf>> = Mutex::new(HashMap::new());
static ref HOOKS: Arc<Mutex<HashMap<String, Hook>>> = Arc::new(Mutex::new(HashMap::new())); static ref HOOKS: Arc<Mutex<HashMap<String, Hook>>> = Arc::new(Mutex::new(HashMap::new()));
} }

View file

@ -34,51 +34,41 @@ pub fn dip_service(req: Request<Body>) -> Box<Future<Item = Response<Body>, Erro
// TODO: filter by method as well // TODO: filter by method as well
let headers = req.headers() let headers = req
.headers()
.clone() .clone()
.into_iter() .into_iter()
.filter_map(|(k, v)| { .filter_map(|(k, v)| {
let key = k.unwrap().as_str().to_owned(); let key = k.unwrap().as_str().to_owned();
v.to_str().map(|value| (key, value.to_owned())).ok() v.to_str().map(|value| (key, value.to_owned())).ok()
}) }).collect::<HashMap<_, _>>();
.collect::<HashMap<_, _>>();
let method = req.method().as_str().to_owned(); let method = req.method().as_str().to_owned();
// spawn job // spawn job
thread::spawn(move || { Box::new(req.into_body().concat2().map(move |body| {
req.into_body().concat2().map(move |body| { let body = String::from_utf8(body.to_vec()).unwrap();
let body = String::from_utf8(body.to_vec()).unwrap(); let req_obj = json!({
let req_obj = json!({ "body": body,
"body": body, "headers": headers,
"headers": headers, "method": method,
"method": method, });
}); let hooks = HOOKS.lock().unwrap();
let hooks = HOOKS.lock().unwrap(); {
{ let mut temp_dir = Temp::new_dir().unwrap();
let mut temp_dir = Temp::new_dir().unwrap(); let temp_path = temp_dir.to_path_buf();
let temp_path = temp_dir.to_path_buf(); assert!(temp_path.exists());
assert!(temp_path.exists());
let hook = hooks.get(&name).unwrap(); let hook = hooks.get(&name).unwrap();
let (code, msg) = match hook.handle(req_obj, temp_path) { let (code, msg) = match hook.handle(req_obj, temp_path) {
Ok(msg) => (StatusCode::ACCEPTED, msg), Ok(msg) => (StatusCode::ACCEPTED, msg),
Err(msg) => (StatusCode::BAD_REQUEST, msg), Err(msg) => (StatusCode::BAD_REQUEST, msg),
}; };
temp_dir.release(); temp_dir.release();
Response::builder() Response::builder()
.status(code) .status(code)
.body(Body::from(msg)) .body(Body::from(msg))
.unwrap_or_else(|err| Response::new(Body::from(format!("{}", err)))) .unwrap_or_else(|err| Response::new(Body::from(format!("{}", err))))
} }
}) }))
});
Box::new(future::ok(
Response::builder()
.status(StatusCode::ACCEPTED)
// TODO: assign job a uuid and do some logging
.body(Body::from(format!("job {} started", -1)))
.unwrap(),
))
} }