From 39a60b7d30688e6d33bc8f5892ad4cdf535738dd Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 17 Aug 2018 01:06:35 +0000 Subject: [PATCH 1/6] wip --- Cargo.lock | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 ++ src/handler.rs | 35 +++++++++++++--------- src/hook.rs | 20 ++++++------- src/lib.rs | 1 + src/service.rs | 5 +++- 6 files changed, 117 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7c41c95..18b5940 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -201,6 +201,8 @@ dependencies = [ "serde_json 1.0.26 (registry+https://github.com/rust-lang/crates.io-index)", "sha-1 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "structopt 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-process 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "toml 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "walkdir 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -483,6 +485,27 @@ dependencies = [ "winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "mio-named-pipes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "log 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.15 (registry+https://github.com/rust-lang/crates.io-index)", + "miow 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "mio-uds" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.15 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "miow" version = "0.1.5" @@ -505,6 +528,15 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "miow" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "socket2 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "mktemp" version = "0.3.1" @@ -734,6 +766,17 @@ name = "slab" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "socket2" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", + "redox_syscall 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "string" version = "0.1.1" @@ -876,6 +919,21 @@ dependencies = [ "log 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio-process" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.15 (registry+https://github.com/rust-lang/crates.io-index)", + "mio-named-pipes 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-reactor 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-signal 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tokio-reactor" version = "0.1.3" @@ -889,6 +947,21 @@ dependencies = [ "tokio-io 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tokio-signal" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", + "mio 0.6.15 (registry+https://github.com/rust-lang/crates.io-index)", + "mio-uds 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-reactor 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "tokio-tcp" version = "0.1.1" @@ -1121,8 +1194,11 @@ dependencies = [ "checksum memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0f9dc261e2b62d7a622bf416ea3c5245cdd5d9a7fcc428c0d06804dfce1775b3" "checksum mio 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a637d1ca14eacae06296a008fa7ad955347e34efcb5891cfd8ba05491a37907e" "checksum mio 0.6.15 (registry+https://github.com/rust-lang/crates.io-index)" = "4fcfcb32d63961fb6f367bfd5d21e4600b92cd310f71f9dca25acae196eb1560" +"checksum mio-named-pipes 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "f5e374eff525ce1c5b7687c4cef63943e7686524a387933ad27ca7ec43779cb3" +"checksum mio-uds 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "84c7b5caa3a118a6e34dbac36504503b1e8dc5835e833306b9d6af0e05929f79" "checksum miow 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "3e690c5df6b2f60acd45d56378981e827ff8295562fc8d34f573deb267a59cd1" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +"checksum miow 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9224c91f82b3c47cf53dcf78dfaa20d6888fbcc5d272d5f2fcdf8a697f3c987d" "checksum mktemp 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "77001ceb9eed65439f3dc2a2543f9ba1417d912686bf224a7738d0966e6dcd69" "checksum net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)" = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" "checksum nix 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bfb3ddedaa14746434a02041940495bf11325c22f6d36125d3bdd56090d50a79" @@ -1152,6 +1228,7 @@ dependencies = [ "checksum sha-1 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "51b9d1f3b5de8a167ab06834a7c883bd197f2191e1dda1a22d9ccfeedbf9aded" "checksum slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d807fd58c4181bbabed77cb3b891ba9748241a552bcc5be698faaebefc54f46e" "checksum slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5f9776d6b986f77b35c6cf846c11ad986ff128fe0b2b63a3628e3755e8d3102d" +"checksum socket2 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "962a516af4d3a7c272cb3a1d50a8cc4e5b41802e4ad54cfb7bee8ba61d37d703" "checksum string 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00caf261d6f90f588f8450b8e1230fa0d5be49ee6140fdfbcb55335aff350970" "checksum strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb4f380125926a99e52bc279241539c018323fab05ad6368b56f93d9369ff550" "checksum structopt 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "d8e9ad6a11096cbecdcca0cc6aa403fdfdbaeda2fb3323a39c98e6a166a1e45a" @@ -1167,7 +1244,9 @@ dependencies = [ "checksum tokio-executor 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "424f0c87ecd66b863045d84e384cb7ce0ae384d8b065b9f0363d29c0d1b30b2f" "checksum tokio-fs 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b5cbe4ca6e71cb0b62a66e4e6f53a8c06a6eefe46cc5f665ad6f274c9906f135" "checksum tokio-io 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "a5c9635ee806f26d302b8baa1e145689a280d8f5aa8d0552e7344808da54cc21" +"checksum tokio-process 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0832648d1ff7ca42c06ca45dc76797b92c56500de828e33c77276fa1449947b6" "checksum tokio-reactor 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8703a5762ff6913510dc64272c714c4389ffd8c4b3cf602879b8bd14ff06b604" +"checksum tokio-signal 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "52ba234f769af7a85b0cf579da820a82675940054b99819beba2a01a356f5f8e" "checksum tokio-tcp 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5b4c329b47f071eb8a746040465fa751bd95e4716e98daef6a9b4e434c17d565" "checksum tokio-threadpool 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "24ab84f574027b0e875378f31575cf175360891919e93a3490f07e76e00e4efb" "checksum tokio-timer 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "1c76b4e97a4f61030edff8bd272364e4f731b9f54c7307eb4eb733c3926eb96a" diff --git a/Cargo.toml b/Cargo.toml index b0c4e6d..06b6648 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,8 @@ serde = "1.0" serde_derive = "1.0" serde_json = "1.0" structopt = "0.2" +tokio = "0.1" +tokio-process = "0.2" toml = "0.4" walkdir = "2.2" diff --git a/src/handler.rs b/src/handler.rs index b96035a..777dec9 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -3,6 +3,7 @@ use std::path::PathBuf; use std::process::{Command, Stdio}; use failure::{err_msg, Error}; +use futures::{future, Future}; use serde::Serialize; use serde_json::{Serializer as JsonSerializer, Value as JsonValue}; use toml::Value as TomlValue; @@ -56,12 +57,16 @@ impl Handler { Ok(Handler { config, action }) } - pub fn run(&self, temp_path: &PathBuf, input: JsonValue) -> Result { + pub fn run( + &self, + temp_path: &PathBuf, + input: JsonValue, + ) -> impl Future { let config = { let mut buf: Vec = Vec::new(); { let mut serializer = JsonSerializer::new(&mut buf); - TomlValue::serialize(&self.config, &mut serializer)?; + TomlValue::serialize(&self.config, &mut serializer); } String::from_utf8(buf).unwrap() }; @@ -78,11 +83,12 @@ impl Handler { .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) - .spawn()?; - let output = child.wait_with_output()?; + .spawn() + .unwrap(); + let output = child.wait_with_output().unwrap(); if !output.status.success() { // TODO: get rid of unwraps - return Err(err_msg(format!( + return future::err(err_msg(format!( "Command '{}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", cmd, output.status, @@ -102,19 +108,20 @@ impl Handler { .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) - .spawn()?; + .spawn() + .unwrap(); { match child.stdin { Some(ref mut stdin) => { - write!(stdin, "{}", input)?; + write!(stdin, "{}", input); } - None => bail!("done fucked"), + None => return future::err(err_msg("done fucked")), }; } - let output = child.wait_with_output()?; + let output = child.wait_with_output().unwrap(); if !output.status.success() { // TODO: get rid of unwraps - return Err(err_msg(format!( + return future::err(err_msg(format!( "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", path, output.status, @@ -128,9 +135,9 @@ impl Handler { let stdout = String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); let stderr = String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); - Ok(json!({ - "stdout": stdout, - "stderr": stderr, - })) + future::ok(json!({ + "stdout": stdout, + "stderr": stderr, + })) } } diff --git a/src/hook.rs b/src/hook.rs index 15673fb..186bb7d 100644 --- a/src/hook.rs +++ b/src/hook.rs @@ -4,8 +4,9 @@ use std::path::{Path, PathBuf}; use std::slice::Iter; use failure::{err_msg, Error}; -use hyper::StatusCode; +use futures::{future, Future}; use serde_json::Value as JsonValue; +use tokio::{self, prelude::*}; use toml::Value; use Handler; @@ -49,20 +50,18 @@ impl Hook { pub fn get_name(&self) -> String { self.name.clone() } - pub fn iter(&self) -> Iter { - self.handlers.iter() - } - pub fn handle( - &self, - req: JsonValue, - temp_path: &PathBuf, - ) -> Result<(StatusCode, String), Error> { + pub fn handle(&self, req: JsonValue, temp_path: &PathBuf) -> Result { + let fut = self.handlers.iter().fold(future::ok(req), |prev, handler| { + prev.and_then(|val| handler.run(temp_path, val)) + }); + tokio::executor::spawn(fut.and_then(|_| future::ok(()))); + Ok("success".to_owned()) + /* Ok(self.iter() .fold(Ok(req), |prev, handler| { prev.and_then(|val| { println!("Running {}...", handler.config()); let result = handler.run(&temp_path, val); - println!("{:?}", result); result }) }) @@ -77,5 +76,6 @@ impl Hook { ) }) .unwrap_or_else(|err| (StatusCode::BAD_REQUEST, format!("Error: {:?}", err)))) + */ } } diff --git a/src/lib.rs b/src/lib.rs index 047a6aa..b782436 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ extern crate futures; extern crate hyper; extern crate mktemp; extern crate serde; +extern crate tokio; #[macro_use] extern crate serde_json; #[macro_use] diff --git a/src/service.rs b/src/service.rs index 8afaa00..30df045 100644 --- a/src/service.rs +++ b/src/service.rs @@ -60,7 +60,10 @@ pub fn dip_service(req: Request) -> Box, Erro assert!(temp_path.exists()); let hook = hooks.get(&name).unwrap(); - let (code, msg) = hook.handle(req_obj, &temp_path).unwrap(); + let (code, msg) = match hook.handle(req_obj, &temp_path) { + Ok(msg) => (StatusCode::ACCEPTED, msg), + Err(msg) => (StatusCode::BAD_REQUEST, msg), + }; temp_dir.release(); Response::builder() From 5ad14efdafa63390a20a64eabc673c382c2cb1b4 Mon Sep 17 00:00:00 2001 From: Michael Date: Fri, 17 Aug 2018 02:04:52 +0000 Subject: [PATCH 2/6] iwll figure out owning ref soon --- Cargo.lock | 16 ++++++++++++++++ Cargo.toml | 1 + src/handler.rs | 11 +++++++---- src/hook.rs | 25 +++++++++++++++++-------- src/lib.rs | 1 + src/service.rs | 2 +- 6 files changed, 43 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 18b5940..15d06a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,6 +194,7 @@ dependencies = [ "lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "mktemp 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", "notify 4.0.4 (registry+https://github.com/rust-lang/crates.io-index)", + "owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "regex 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "secstr 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.71 (registry+https://github.com/rust-lang/crates.io-index)", @@ -594,6 +595,14 @@ dependencies = [ "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "owning_ref" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "proc-macro2" version = "0.4.11" @@ -777,6 +786,11 @@ dependencies = [ "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "stable_deref_trait" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "string" version = "0.1.1" @@ -1205,6 +1219,7 @@ dependencies = [ "checksum nodrop 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "9a2228dca57108069a5262f2ed8bd2e82496d2e074a06d1ccc7ce1687b6ae0a2" "checksum notify 4.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d023ef40ca7680784b07be3f49913e1ea176da1b63949f2eb2fed96438bd7f42" "checksum num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c51a3322e4bca9d212ad9a158a02abc6934d005490c054a2778df73a70aa0a30" +"checksum owning_ref 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "cdf84f41639e037b484f93433aa3897863b561ed65c6e59c7073d7c561710f37" "checksum proc-macro2 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "762eea716b821300a86da08870a64b597304866ceb9f54a11d67b4cf56459c6a" "checksum quote 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)" = "ed7d650913520df631972f21e104a4fa2f9c82a14afc65d17b388a2e29731e7c" "checksum rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)" = "15a732abf9d20f0ad8eeb6f909bf6868722d9a06e1e50802b6a70351f40b4eb1" @@ -1229,6 +1244,7 @@ dependencies = [ "checksum slab 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d807fd58c4181bbabed77cb3b891ba9748241a552bcc5be698faaebefc54f46e" "checksum slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5f9776d6b986f77b35c6cf846c11ad986ff128fe0b2b63a3628e3755e8d3102d" "checksum socket2 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "962a516af4d3a7c272cb3a1d50a8cc4e5b41802e4ad54cfb7bee8ba61d37d703" +"checksum stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" "checksum string 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00caf261d6f90f588f8450b8e1230fa0d5be49ee6140fdfbcb55335aff350970" "checksum strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb4f380125926a99e52bc279241539c018323fab05ad6368b56f93d9369ff550" "checksum structopt 0.2.10 (registry+https://github.com/rust-lang/crates.io-index)" = "d8e9ad6a11096cbecdcca0cc6aa403fdfdbaeda2fb3323a39c98e6a166a1e45a" diff --git a/Cargo.toml b/Cargo.toml index 06b6648..f0b6e5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ hyper = "0.12" mktemp = "0.3" lazy_static = "1.1" notify = "4.0" +owning_ref = "0.3" regex = "1.0" serde = "1.0" serde_derive = "1.0" diff --git a/src/handler.rs b/src/handler.rs index 777dec9..bcd41be 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -59,9 +59,9 @@ impl Handler { pub fn run( &self, - temp_path: &PathBuf, + temp_path: PathBuf, input: JsonValue, - ) -> impl Future { + ) -> impl Future { let config = { let mut buf: Vec = Vec::new(); { @@ -135,9 +135,12 @@ impl Handler { let stdout = String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); let stderr = String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); - future::ok(json!({ + future::ok(( + temp_path, + json!({ "stdout": stdout, "stderr": stderr, - })) + }), + )) } } diff --git a/src/hook.rs b/src/hook.rs index 186bb7d..4158349 100644 --- a/src/hook.rs +++ b/src/hook.rs @@ -2,9 +2,11 @@ use std::fs::File; use std::io::Read; use std::path::{Path, PathBuf}; use std::slice::Iter; +use std::sync::Arc; use failure::{err_msg, Error}; -use futures::{future, Future}; +use futures::{future, stream, Future}; +use owning_ref::BoxRef; use serde_json::Value as JsonValue; use tokio::{self, prelude::*}; use toml::Value; @@ -13,20 +15,20 @@ use Handler; pub struct Hook { name: String, - handlers: Vec, + handlers: Arc>, } impl Hook { pub fn from(name: impl Into, config: &Value) -> Result { let name = name.into(); - let handlers = config + let handlers = Arc::new(config .get("handlers") .ok_or(err_msg("No 'handlers' found."))? .as_array() .ok_or(err_msg("'handlers' is not an array."))? .iter() .map(|value: &Value| Handler::from(value)) - .collect::, _>>()?; + .collect::, _>>()?); Ok(Hook { name, handlers }) } pub fn from_file

(path: P) -> Result @@ -50,11 +52,18 @@ impl Hook { pub fn get_name(&self) -> String { self.name.clone() } - pub fn handle(&self, req: JsonValue, temp_path: &PathBuf) -> Result { - let fut = self.handlers.iter().fold(future::ok(req), |prev, handler| { - prev.and_then(|val| handler.run(temp_path, val)) + pub fn handle(&self, req: JsonValue, temp_path: PathBuf) -> Result { + let h = self.handlers.clone(); + let it = h.iter(); + let s = stream::iter_ok(it).fold((temp_path, req), move |prev, handler| { + let (path, prev) = prev; + handler.run(path, prev) }); - tokio::executor::spawn(fut.and_then(|_| future::ok(()))); + /*.fold(future::ok(req), |prev, handler| { + prev.and_then(|val| handler.run(temp_path, val)) + });*/ + let s = s.map(|_| ()).map_err(|_: Error| ()); + tokio::executor::spawn(s); Ok("success".to_owned()) /* Ok(self.iter() diff --git a/src/lib.rs b/src/lib.rs index b782436..c60aa99 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ extern crate failure; extern crate futures; extern crate hyper; extern crate mktemp; +extern crate owning_ref; extern crate serde; extern crate tokio; #[macro_use] diff --git a/src/service.rs b/src/service.rs index 30df045..9e60c8e 100644 --- a/src/service.rs +++ b/src/service.rs @@ -60,7 +60,7 @@ pub fn dip_service(req: Request) -> Box, Erro assert!(temp_path.exists()); let hook = hooks.get(&name).unwrap(); - let (code, msg) = match hook.handle(req_obj, &temp_path) { + let (code, msg) = match hook.handle(req_obj, temp_path) { Ok(msg) => (StatusCode::ACCEPTED, msg), Err(msg) => (StatusCode::BAD_REQUEST, msg), }; From 642ff6616191be8ab451f67b63eac852f5dd1bed Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Mon, 20 Aug 2018 20:53:37 -0500 Subject: [PATCH 3/6] wip --- examples/github.rs | 27 +++-- src/config.rs | 3 +- src/handler.rs | 249 ++++++++++++++++++++++++++++++++++----------- src/hook.rs | 53 ++++++---- src/lib.rs | 4 +- src/service.rs | 64 +++++------- 6 files changed, 275 insertions(+), 125 deletions(-) diff --git a/examples/github.rs b/examples/github.rs index 2574d9f..0b933f0 100644 --- a/examples/github.rs +++ b/examples/github.rs @@ -60,20 +60,24 @@ fn default_path() -> PathBuf { PathBuf::from(".") } -fn main() -> Result<(), Error> { +fn main() { let args = Opt::from_args(); - let config: Config = serde_json::from_str(&args.config)?; + let config: Config = serde_json::from_str(&args.config).expect("Could not parse config."); println!("{:?}", config); let mut payload = String::new(); - io::stdin().read_to_string(&mut payload)?; - let payload: Payload = serde_json::from_str(&payload)?; + io::stdin() + .read_to_string(&mut payload) + .expect("Could not read from stdin"); + let payload: Payload = serde_json::from_str(&payload) + .expect(&format!("Could not parse stdin into json: '{}'", payload)); if !config.disable_hmac_verify { let secret = GenericArray::from_iter(config.secret.bytes()); let mut mac = Hmac::::new(&secret); mac.input(payload.body.as_bytes()); - let signature = mac.result() + let signature = mac + .result() .code() .into_iter() .map(|b| format!("{:02x}", b)) @@ -83,15 +87,18 @@ fn main() -> Result<(), Error> { let auth = payload .headers .get("x-hub-signature") - .ok_or(err_msg("Missing auth header"))?; + .ok_or(err_msg("Missing auth header")) + .expect("Missing auth header"); let left = SecStr::from(format!("sha1={}", signature)); let right = SecStr::from(auth.bytes().collect::>()); assert!(left == right, "HMAC signature didn't match",); } - let payload: GithubPayload = serde_json::from_str(&payload.body)?; - let mut target_path = PathBuf::from(env::var("DIP_WORKDIR")?); + let payload: GithubPayload = + serde_json::from_str(&payload.body).expect("Could not parse Github input into json"); + let mut target_path = + PathBuf::from(env::var("DIP_WORKDIR").expect("Could not determine working directory")); target_path.push(&config.path); Command::new("git") .arg("clone") @@ -100,6 +107,6 @@ fn main() -> Result<(), Error> { .arg("--depth") .arg("1") .arg(&target_path) - .output()?; - Ok(()) + .output() + .expect("Could not spawn process to clone"); } diff --git a/src/config.rs b/src/config.rs index fd8c5a5..1ff28a3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -65,7 +65,8 @@ where if !path.is_file() { continue; } - match path.file_name() + match path + .file_name() .and_then(|s| s.to_str()) .ok_or(err_msg("???")) .map(|s| { diff --git a/src/handler.rs b/src/handler.rs index bcd41be..76b4ce8 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,20 +1,26 @@ use std::io::Write; use std::path::PathBuf; -use std::process::{Command, Stdio}; +use std::process::{Command, Output, Stdio}; use failure::{err_msg, Error}; -use futures::{future, Future}; +use futures::{ + future::{self, FutureResult}, + Future, +}; use serde::Serialize; use serde_json::{Serializer as JsonSerializer, Value as JsonValue}; +use tokio_process::CommandExt; use toml::Value as TomlValue; use PROGRAMS; +#[derive(Clone, Debug)] pub struct Handler { - config: TomlValue, - action: Action, + pub config: TomlValue, + pub action: Action, } +#[derive(Clone, Debug)] pub enum Action { Command(String), Exec(PathBuf), @@ -48,8 +54,7 @@ impl Handler { value .canonicalize() .map_err(|_| err_msg("failed to canonicalize the path")) - }) - .map(|value| value.clone())?; + }).map(|value| value.clone())?; Action::Exec(program) } }; @@ -58,89 +63,217 @@ impl Handler { } pub fn run( - &self, + config: TomlValue, + action: Action, temp_path: PathBuf, input: JsonValue, ) -> impl Future { + println!("Running: {:?} :: {:?}", config, action); let config = { let mut buf: Vec = Vec::new(); { let mut serializer = JsonSerializer::new(&mut buf); - TomlValue::serialize(&self.config, &mut serializer); + TomlValue::serialize(&config, &mut serializer).unwrap(); } String::from_utf8(buf).unwrap() }; - let output = match &self.action { + let output: Box + Send> = match action { Action::Command(ref cmd) => { // TODO: allow some kind of simple variable replacement - let child = Command::new("/bin/bash") + let mut child = Command::new("/bin/bash"); + let child = child .current_dir(&temp_path) .env("DIP_ROOT", "lol") - .env("DIP_WORKDIR", temp_path) + .env("DIP_WORKDIR", &temp_path) .arg("-c") .arg(cmd) .stdin(Stdio::piped()) .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn() - .unwrap(); - let output = child.wait_with_output().unwrap(); - if !output.status.success() { - // TODO: get rid of unwraps - return future::err(err_msg(format!( - "Command '{}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", - cmd, - output.status, - String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), - String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) - ))); - } - output + .stderr(Stdio::piped()); + let result = child + .output_async() + .map_err(|err| err_msg(format!("failed to spawn child: {}", err))) + .and_then(|output| { + let stdout = + String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); + let stderr = + String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); + future::ok(json!({ + "stdout": stdout, + "stderr": stderr, + })) + }); + Box::new(result) } Action::Exec(ref path) => { let mut child = Command::new(&path) .current_dir(&temp_path) .env("DIP_ROOT", "") - .env("DIP_WORKDIR", temp_path) + .env("DIP_WORKDIR", &temp_path) .arg("--config") .arg(config) - .stdin(Stdio::piped()) + .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) - .spawn() - .unwrap(); - { - match child.stdin { - Some(ref mut stdin) => { - write!(stdin, "{}", input); + .spawn_async() + .expect("could not spawn child"); + match child.stdin() { + Some(ref mut stdin) => { + println!("writing input: '{}'", input); + write!(stdin, "{}", input); + } + None => (), + }; + let result = child + .wait_with_output() + .and_then(|output| { + if output.status.success() { + future::ok(output) + } else { + // TODO: change this + future::ok(output) } - None => return future::err(err_msg("done fucked")), - }; - } - let output = child.wait_with_output().unwrap(); - if !output.status.success() { - // TODO: get rid of unwraps - return future::err(err_msg(format!( - "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", - path, - output.status, - String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), - String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) - ))); - } - output + }).map(|output| { + let stdout = + String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); + let stderr = + String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); + println!("stdout: {}, stderr: {}", stdout, stderr); + json!({ + "stdout": stdout, + "stderr": stderr, + }) + }).map_err(|err| err_msg(format!("could not get output: {}", err))); + Box::new(result) + // .and_then(move |output| { + // if output.status.success() { + // future::ok(output) + // } else { + // // TODO: get rid of unwraps + // future::err(err_msg(format!( + // "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", + // path, + // output.status, + // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), + // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) + // ))) + // } + // }).map(|output| { + // let stdout = + // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); + // let stderr = + // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); + // // future::ok(json!({ + // // "stdout": stdout, + // // "stderr": stderr, + // // })) + // json!("") + // }).map_err(|err| err_msg(format!("could not get output: {}", err))) + + // let result = child + // .spawn_async() + // .expect("could not spawn child") + // .map_err(|err| err_msg(format!("failed to get output: {}", err))) + // .and_then(|child| { + // match child.stdin() { + // Some(ref mut stdin) => { + // // future::result(write!(stdin, "{}", input)) + // future::ok(0) + // } + // None => future::err(err_msg("done fucked")), + // } + // }); + + // let output = child.wait_with_output().unwrap(); + // if !output.status.success() { + // // TODO: get rid of unwraps + // return future::err(err_msg(format!( + // "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", + // path, + // output.status, + // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), + // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) + // ))); + // } + // output } }; + output.map(|x| (temp_path, x)) - let stdout = String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); - let stderr = String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); - future::ok(( - temp_path, - json!({ - "stdout": stdout, - "stderr": stderr, - }), - )) + // let result = match action { + // Action::Command(ref cmd) => { + // // TODO: allow some kind of simple variable replacement + // let child = Command::new("/bin/bash") + // .current_dir(&temp_path) + // .env("DIP_ROOT", "lol") + // .env("DIP_WORKDIR", &temp_path) + // .arg("-c") + // .arg(cmd) + // .stdin(Stdio::piped()) + // .stdout(Stdio::piped()) + // .stderr(Stdio::piped()); + // child.output_async().map_err(|err| err_msg(format!("failed to get output: {}", err))).and_then(|output| { + // if output.status.success() { + // future::ok(output) + // } else { + // // TODO: get rid of unwraps + // future::err(err_msg(format!( + // "Command '{}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", + // cmd, + // output.status, + // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), + // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) + // ))) + // } + // }) + // } + // Action::Exec(ref path) => { + // let mut child = Command::new(&path) + // .current_dir(&temp_path) + // .env("DIP_ROOT", "") + // .env("DIP_WORKDIR", &temp_path) + // .arg("--config") + // .arg(config) + // .stdin(Stdio::piped()) + // .stdout(Stdio::piped()) + // .stderr(Stdio::piped()); + // child + // .spawn_async() + // .map_err(|err| err_msg(format!("failed to get output: {}", err))).and_then(|child| { + // future::ok() + // }) + // // { + // // match child.stdin { + // // Some(ref mut stdin) => { + // // write!(stdin, "{}", input); + // // } + // // None => return future::err(err_msg("done fucked")), + // // }; + // // } + // // let output = child.wait_with_output().unwrap(); + // // if !output.status.success() { + // // // TODO: get rid of unwraps + // // return future::err(err_msg(format!( + // // "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", + // // path, + // // output.status, + // // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), + // // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) + // // ))); + // // } + // // output + // } + // }; + + // let stdout = String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); + // let stderr = String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); + // future::ok(( + // temp_path, + // json!({ + // "stdout": stdout, + // "stderr": stderr, + // }), + // )) } } diff --git a/src/hook.rs b/src/hook.rs index 4158349..7cd897a 100644 --- a/src/hook.rs +++ b/src/hook.rs @@ -6,36 +6,37 @@ use std::sync::Arc; use failure::{err_msg, Error}; use futures::{future, stream, Future}; -use owning_ref::BoxRef; use serde_json::Value as JsonValue; use tokio::{self, prelude::*}; use toml::Value; use Handler; +use HANDLERS; pub struct Hook { name: String, - handlers: Arc>, + handlers: Vec, } impl Hook { pub fn from(name: impl Into, config: &Value) -> Result { let name = name.into(); - let handlers = Arc::new(config + let handlers = config .get("handlers") .ok_or(err_msg("No 'handlers' found."))? .as_array() .ok_or(err_msg("'handlers' is not an array."))? .iter() .map(|value: &Value| Handler::from(value)) - .collect::, _>>()?); + .collect::, _>>()?; Ok(Hook { name, handlers }) } pub fn from_file

(path: P) -> Result where P: AsRef, { - let filename = path.as_ref() + let filename = path + .as_ref() .file_name() .ok_or(err_msg("what the fuck bro"))? .to_str() @@ -53,18 +54,35 @@ impl Hook { self.name.clone() } pub fn handle(&self, req: JsonValue, temp_path: PathBuf) -> Result { - let h = self.handlers.clone(); - let it = h.iter(); - let s = stream::iter_ok(it).fold((temp_path, req), move |prev, handler| { - let (path, prev) = prev; - handler.run(path, prev) - }); - /*.fold(future::ok(req), |prev, handler| { - prev.and_then(|val| handler.run(temp_path, val)) - });*/ - let s = s.map(|_| ()).map_err(|_: Error| ()); - tokio::executor::spawn(s); + let handlers = self + .handlers + .iter() + .map(|handler| (handler.config.clone(), handler.action.clone())) + .collect::>(); + let st = stream::iter_ok::<_, Error>(handlers.into_iter()) + .fold((temp_path, req), |(path, prev), (config, action)| { + println!("executing in {:?}", &path); + Handler::run(config, action, path, prev) + }).map(|_| ()) + .map_err(|_: Error| ()); + tokio::executor::spawn(st); + // let it = stream::iter_ok::<_, Error>(handlers.iter()); + // let s = it.fold((temp_path, req), move |(path, prev), handler| { + // let result = handler.run(path.clone(), prev.clone()); + // result + // }).map(|_|()).map_err(|_| ()); + // tokio::executor::spawn(s); Ok("success".to_owned()) + // let s = stream::iter_ok(self.handlers.iter()).fold((temp_path, req), move |prev, handler| { + // let (path, prev) = prev; + // handler.run(path, prev) + // }); + // .fold(future::ok(req), |prev, handler| { + // prev.and_then(|val| handler.run(temp_path, val)) + // }); + // let s = s.map(|_| ()).map_err(|_: Error| ()); + // tokio::executor::spawn(s); + // Ok("success".to_owned()) /* Ok(self.iter() .fold(Ok(req), |prev, handler| { @@ -85,6 +103,5 @@ impl Hook { ) }) .unwrap_or_else(|err| (StatusCode::BAD_REQUEST, format!("Error: {:?}", err)))) - */ - } + */ } } diff --git a/src/lib.rs b/src/lib.rs index c60aa99..ff1e795 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ extern crate mktemp; extern crate owning_ref; extern crate serde; extern crate tokio; +extern crate tokio_process; #[macro_use] extern crate serde_json; #[macro_use] @@ -48,7 +49,8 @@ const URIPATTERN_STR: &str = r"/webhook/(?P[A-Za-z._][A-Za-z0-9._]*)"; lazy_static! { static ref URIPATTERN: Regex = Regex::new(URIPATTERN_STR).unwrap(); - // static ref HANDLERS: Mutex>> = Mutex::new(HashMap::new()); + static ref HANDLERS: Arc>> = + Arc::new(Mutex::new(HashMap::new())); static ref PROGRAMS: Mutex> = Mutex::new(HashMap::new()); static ref HOOKS: Arc>> = Arc::new(Mutex::new(HashMap::new())); } diff --git a/src/service.rs b/src/service.rs index 9e60c8e..9ec7803 100644 --- a/src/service.rs +++ b/src/service.rs @@ -34,51 +34,41 @@ pub fn dip_service(req: Request) -> Box, Erro // TODO: filter by method as well - let headers = req.headers() + let headers = req + .headers() .clone() .into_iter() .filter_map(|(k, v)| { let key = k.unwrap().as_str().to_owned(); v.to_str().map(|value| (key, value.to_owned())).ok() - }) - .collect::>(); + }).collect::>(); let method = req.method().as_str().to_owned(); // spawn job - thread::spawn(move || { - req.into_body().concat2().map(move |body| { - let body = String::from_utf8(body.to_vec()).unwrap(); - let req_obj = json!({ - "body": body, - "headers": headers, - "method": method, - }); - let hooks = HOOKS.lock().unwrap(); - { - let mut temp_dir = Temp::new_dir().unwrap(); - let temp_path = temp_dir.to_path_buf(); - assert!(temp_path.exists()); + Box::new(req.into_body().concat2().map(move |body| { + let body = String::from_utf8(body.to_vec()).unwrap(); + let req_obj = json!({ + "body": body, + "headers": headers, + "method": method, + }); + let hooks = HOOKS.lock().unwrap(); + { + let mut temp_dir = Temp::new_dir().unwrap(); + let temp_path = temp_dir.to_path_buf(); + assert!(temp_path.exists()); - let hook = hooks.get(&name).unwrap(); - let (code, msg) = match hook.handle(req_obj, temp_path) { - Ok(msg) => (StatusCode::ACCEPTED, msg), - Err(msg) => (StatusCode::BAD_REQUEST, msg), - }; + let hook = hooks.get(&name).unwrap(); + let (code, msg) = match hook.handle(req_obj, temp_path) { + Ok(msg) => (StatusCode::ACCEPTED, msg), + Err(msg) => (StatusCode::BAD_REQUEST, msg), + }; - temp_dir.release(); - Response::builder() - .status(code) - .body(Body::from(msg)) - .unwrap_or_else(|err| Response::new(Body::from(format!("{}", err)))) - } - }) - }); - - Box::new(future::ok( - Response::builder() - .status(StatusCode::ACCEPTED) - // TODO: assign job a uuid and do some logging - .body(Body::from(format!("job {} started", -1))) - .unwrap(), - )) + temp_dir.release(); + Response::builder() + .status(code) + .body(Body::from(msg)) + .unwrap_or_else(|err| Response::new(Body::from(format!("{}", err)))) + } + })) } From 34eae4569f4fae496781462e0b21dd6165b27bb9 Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Wed, 22 Aug 2018 21:30:55 -0500 Subject: [PATCH 4/6] it works i guess? --- examples/github.rs | 4 +-- src/handler.rs | 86 ++++++++++++++++++++++++++++++++-------------- src/hook.rs | 5 +-- src/lib.rs | 5 +-- src/service.rs | 1 - 5 files changed, 64 insertions(+), 37 deletions(-) diff --git a/examples/github.rs b/examples/github.rs index 0b933f0..6e4452d 100644 --- a/examples/github.rs +++ b/examples/github.rs @@ -17,7 +17,7 @@ use std::iter::FromIterator; use std::path::PathBuf; use std::process::Command; -use failure::{err_msg, Error}; +use failure::err_msg; use generic_array::GenericArray; use hmac::{Hmac, Mac}; use secstr::*; @@ -63,7 +63,6 @@ fn default_path() -> PathBuf { fn main() { let args = Opt::from_args(); let config: Config = serde_json::from_str(&args.config).expect("Could not parse config."); - println!("{:?}", config); let mut payload = String::new(); io::stdin() @@ -71,6 +70,7 @@ fn main() { .expect("Could not read from stdin"); let payload: Payload = serde_json::from_str(&payload) .expect(&format!("Could not parse stdin into json: '{}'", payload)); + println!("Read body: '{}'", payload.body); if !config.disable_hmac_verify { let secret = GenericArray::from_iter(config.secret.bytes()); diff --git a/src/handler.rs b/src/handler.rs index 76b4ce8..2e9051c 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,14 +1,15 @@ -use std::io::Write; use std::path::PathBuf; -use std::process::{Command, Output, Stdio}; +use std::process::{Command, Stdio}; use failure::{err_msg, Error}; use futures::{ - future::{self, FutureResult}, + future::{self, Either, FutureResult}, + sink::Sink, Future, }; use serde::Serialize; use serde_json::{Serializer as JsonSerializer, Value as JsonValue}; +use tokio::io::write_all; use tokio_process::CommandExt; use toml::Value as TomlValue; @@ -113,39 +114,72 @@ impl Handler { .env("DIP_WORKDIR", &temp_path) .arg("--config") .arg(config) - .stdin(Stdio::piped()) + .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn_async() .expect("could not spawn child"); - match child.stdin() { - Some(ref mut stdin) => { - println!("writing input: '{}'", input); - write!(stdin, "{}", input); - } - None => (), - }; - let result = child - .wait_with_output() - .and_then(|output| { - if output.status.success() { - future::ok(output) - } else { - // TODO: change this - future::ok(output) - } - }).map(|output| { + + let stdin = child.stdin().take().unwrap(); + + let input = format!("{}", input); + let result = write_all(stdin, input) + .and_then(|_| child.wait_with_output()) + .map(|output| { let stdout = String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); let stderr = String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); - println!("stdout: {}, stderr: {}", stdout, stderr); json!({ - "stdout": stdout, - "stderr": stderr, - }) - }).map_err(|err| err_msg(format!("could not get output: {}", err))); + "stdout": stdout, + "stderr": stderr, + }) + }).map_err(|err| err_msg(format!("error: {}", err))); + + // let _result: Either<_, FutureResult<(), Error>> = { + // match child.stdin() { + // Some(ref mut stdin) => Either::A(write_all(stdin, input.as_bytes())), + // None => Either::B(future::err(err_msg("rip"))), + // } + // }; Box::new(result) + + // let input_s = format!("{}", input); + // let result: Box + Send> = { + // let rf = child.clone().lock().unwrap(); + // match rf.stdin() { + // Some(ref mut stdin) => Box::new( + // write_all(stdin, input_s.as_bytes()) + // .map(|_| ()) + // .map_err(|err| err_msg(format!("error: {}", err))), + // ), + // None => Box::new(future::err(err_msg("Failed to acquire child stdin"))), + // } + // }; + // { + // let rf = child.clone().lock().unwrap(); + // let result = rf + // .wait_with_output() + // .and_then(|output| { + // if output.status.success() { + // future::ok(output) + // } else { + // // TODO: change this + // future::ok(output) + // } + // }).map(|output| { + // let stdout = String::from_utf8(output.stdout) + // .unwrap_or_else(|_| String::new()); + // let stderr = String::from_utf8(output.stderr) + // .unwrap_or_else(|_| String::new()); + // println!("stdout: {}, stderr: {}", stdout, stderr); + // json!({ + // "stdout": stdout, + // "stderr": stderr, + // }) + // }).map_err(|err| err_msg(format!("could not get output: {}", err))); + // }); + // .and_then(move |output| { // if output.status.success() { // future::ok(output) diff --git a/src/hook.rs b/src/hook.rs index 7cd897a..26b2b84 100644 --- a/src/hook.rs +++ b/src/hook.rs @@ -1,17 +1,14 @@ use std::fs::File; use std::io::Read; use std::path::{Path, PathBuf}; -use std::slice::Iter; -use std::sync::Arc; use failure::{err_msg, Error}; -use futures::{future, stream, Future}; +use futures::{stream, Future}; use serde_json::Value as JsonValue; use tokio::{self, prelude::*}; use toml::Value; use Handler; -use HANDLERS; pub struct Hook { name: String, diff --git a/src/lib.rs b/src/lib.rs index ff1e795..81a1ed4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,5 @@ //! # Dip -#[macro_use] extern crate failure; extern crate futures; extern crate hyper; @@ -55,8 +54,6 @@ lazy_static! { static ref HOOKS: Arc>> = Arc::new(Mutex::new(HashMap::new())); } -// const NOTFOUND: &str = "

Looks like you took a wrong turn!

There's nothing to see here.

"; - fn watch

(root: P) -> notify::Result<()> where P: AsRef, @@ -72,7 +69,7 @@ where // TODO: don't do this config::load_config(root.as_ref()) } - Err(e) => println!("watch error: {:?}", e), + Err(e) => eprintln!("watch error: {:?}", e), } } } diff --git a/src/service.rs b/src/service.rs index 9ec7803..36fdbc4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::thread; use futures::{future, Future, Stream}; use hyper::{Body, Error, Request, Response, StatusCode}; From 7c1360d9b92ecc75f7ef87420c14f02d9c9adceb Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Wed, 22 Aug 2018 21:32:06 -0500 Subject: [PATCH 5/6] delete commented sections --- src/handler.rs | 169 ------------------------------------------------- 1 file changed, 169 deletions(-) diff --git a/src/handler.rs b/src/handler.rs index 2e9051c..bfc707f 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -136,178 +136,9 @@ impl Handler { }) }).map_err(|err| err_msg(format!("error: {}", err))); - // let _result: Either<_, FutureResult<(), Error>> = { - // match child.stdin() { - // Some(ref mut stdin) => Either::A(write_all(stdin, input.as_bytes())), - // None => Either::B(future::err(err_msg("rip"))), - // } - // }; Box::new(result) - - // let input_s = format!("{}", input); - // let result: Box + Send> = { - // let rf = child.clone().lock().unwrap(); - // match rf.stdin() { - // Some(ref mut stdin) => Box::new( - // write_all(stdin, input_s.as_bytes()) - // .map(|_| ()) - // .map_err(|err| err_msg(format!("error: {}", err))), - // ), - // None => Box::new(future::err(err_msg("Failed to acquire child stdin"))), - // } - // }; - // { - // let rf = child.clone().lock().unwrap(); - // let result = rf - // .wait_with_output() - // .and_then(|output| { - // if output.status.success() { - // future::ok(output) - // } else { - // // TODO: change this - // future::ok(output) - // } - // }).map(|output| { - // let stdout = String::from_utf8(output.stdout) - // .unwrap_or_else(|_| String::new()); - // let stderr = String::from_utf8(output.stderr) - // .unwrap_or_else(|_| String::new()); - // println!("stdout: {}, stderr: {}", stdout, stderr); - // json!({ - // "stdout": stdout, - // "stderr": stderr, - // }) - // }).map_err(|err| err_msg(format!("could not get output: {}", err))); - // }); - - // .and_then(move |output| { - // if output.status.success() { - // future::ok(output) - // } else { - // // TODO: get rid of unwraps - // future::err(err_msg(format!( - // "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", - // path, - // output.status, - // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), - // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) - // ))) - // } - // }).map(|output| { - // let stdout = - // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); - // let stderr = - // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); - // // future::ok(json!({ - // // "stdout": stdout, - // // "stderr": stderr, - // // })) - // json!("") - // }).map_err(|err| err_msg(format!("could not get output: {}", err))) - - // let result = child - // .spawn_async() - // .expect("could not spawn child") - // .map_err(|err| err_msg(format!("failed to get output: {}", err))) - // .and_then(|child| { - // match child.stdin() { - // Some(ref mut stdin) => { - // // future::result(write!(stdin, "{}", input)) - // future::ok(0) - // } - // None => future::err(err_msg("done fucked")), - // } - // }); - - // let output = child.wait_with_output().unwrap(); - // if !output.status.success() { - // // TODO: get rid of unwraps - // return future::err(err_msg(format!( - // "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", - // path, - // output.status, - // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), - // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) - // ))); - // } - // output } }; output.map(|x| (temp_path, x)) - - // let result = match action { - // Action::Command(ref cmd) => { - // // TODO: allow some kind of simple variable replacement - // let child = Command::new("/bin/bash") - // .current_dir(&temp_path) - // .env("DIP_ROOT", "lol") - // .env("DIP_WORKDIR", &temp_path) - // .arg("-c") - // .arg(cmd) - // .stdin(Stdio::piped()) - // .stdout(Stdio::piped()) - // .stderr(Stdio::piped()); - // child.output_async().map_err(|err| err_msg(format!("failed to get output: {}", err))).and_then(|output| { - // if output.status.success() { - // future::ok(output) - // } else { - // // TODO: get rid of unwraps - // future::err(err_msg(format!( - // "Command '{}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", - // cmd, - // output.status, - // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), - // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) - // ))) - // } - // }) - // } - // Action::Exec(ref path) => { - // let mut child = Command::new(&path) - // .current_dir(&temp_path) - // .env("DIP_ROOT", "") - // .env("DIP_WORKDIR", &temp_path) - // .arg("--config") - // .arg(config) - // .stdin(Stdio::piped()) - // .stdout(Stdio::piped()) - // .stderr(Stdio::piped()); - // child - // .spawn_async() - // .map_err(|err| err_msg(format!("failed to get output: {}", err))).and_then(|child| { - // future::ok() - // }) - // // { - // // match child.stdin { - // // Some(ref mut stdin) => { - // // write!(stdin, "{}", input); - // // } - // // None => return future::err(err_msg("done fucked")), - // // }; - // // } - // // let output = child.wait_with_output().unwrap(); - // // if !output.status.success() { - // // // TODO: get rid of unwraps - // // return future::err(err_msg(format!( - // // "'{:?}' returned with a non-zero status code: {}\nstdout:\n{}\nstderr:\n{}", - // // path, - // // output.status, - // // String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()), - // // String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()) - // // ))); - // // } - // // output - // } - // }; - - // let stdout = String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); - // let stderr = String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); - // future::ok(( - // temp_path, - // json!({ - // "stdout": stdout, - // "stderr": stderr, - // }), - // )) } } From e57f8f1796e3846a52df515ba989957819775894 Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Wed, 22 Aug 2018 21:45:11 -0500 Subject: [PATCH 6/6] works i guess --- examples/github.rs | 8 ++++++-- src/handler.rs | 24 +++++++++++++++--------- src/hook.rs | 43 ++++--------------------------------------- 3 files changed, 25 insertions(+), 50 deletions(-) diff --git a/examples/github.rs b/examples/github.rs index 6e4452d..5656e0e 100644 --- a/examples/github.rs +++ b/examples/github.rs @@ -70,7 +70,6 @@ fn main() { .expect("Could not read from stdin"); let payload: Payload = serde_json::from_str(&payload) .expect(&format!("Could not parse stdin into json: '{}'", payload)); - println!("Read body: '{}'", payload.body); if !config.disable_hmac_verify { let secret = GenericArray::from_iter(config.secret.bytes()); @@ -92,7 +91,12 @@ fn main() { let left = SecStr::from(format!("sha1={}", signature)); let right = SecStr::from(auth.bytes().collect::>()); - assert!(left == right, "HMAC signature didn't match",); + assert!( + left == right, + "HMAC signature didn't match: {} vs. {}", + signature, + auth + ); } let payload: GithubPayload = diff --git a/src/handler.rs b/src/handler.rs index bfc707f..d303f08 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -3,8 +3,7 @@ use std::process::{Command, Stdio}; use failure::{err_msg, Error}; use futures::{ - future::{self, Either, FutureResult}, - sink::Sink, + future::{self, Either}, Future, }; use serde::Serialize; @@ -69,7 +68,6 @@ impl Handler { temp_path: PathBuf, input: JsonValue, ) -> impl Future { - println!("Running: {:?} :: {:?}", config, action); let config = { let mut buf: Vec = Vec::new(); { @@ -125,16 +123,24 @@ impl Handler { let input = format!("{}", input); let result = write_all(stdin, input) .and_then(|_| child.wait_with_output()) - .map(|output| { + .map_err(|err| err_msg(format!("error: {}", err))) + .and_then(|output| { let stdout = String::from_utf8(output.stdout).unwrap_or_else(|_| String::new()); let stderr = String::from_utf8(output.stderr).unwrap_or_else(|_| String::new()); - json!({ - "stdout": stdout, - "stderr": stderr, - }) - }).map_err(|err| err_msg(format!("error: {}", err))); + if output.status.success() { + Either::A(future::ok(json!({ + "stdout": stdout, + "stderr": stderr, + }))) + } else { + Either::B(future::err(err_msg(format!( + "Failed, stdout: '{}', stderr: '{}'", + stdout, stderr + )))) + } + }); Box::new(result) } diff --git a/src/hook.rs b/src/hook.rs index 26b2b84..068454e 100644 --- a/src/hook.rs +++ b/src/hook.rs @@ -58,47 +58,12 @@ impl Hook { .collect::>(); let st = stream::iter_ok::<_, Error>(handlers.into_iter()) .fold((temp_path, req), |(path, prev), (config, action)| { - println!("executing in {:?}", &path); Handler::run(config, action, path, prev) }).map(|_| ()) - .map_err(|_: Error| ()); + .map_err(|err: Error| { + println!("Error from stream: {}", err); + }); tokio::executor::spawn(st); - // let it = stream::iter_ok::<_, Error>(handlers.iter()); - // let s = it.fold((temp_path, req), move |(path, prev), handler| { - // let result = handler.run(path.clone(), prev.clone()); - // result - // }).map(|_|()).map_err(|_| ()); - // tokio::executor::spawn(s); Ok("success".to_owned()) - // let s = stream::iter_ok(self.handlers.iter()).fold((temp_path, req), move |prev, handler| { - // let (path, prev) = prev; - // handler.run(path, prev) - // }); - // .fold(future::ok(req), |prev, handler| { - // prev.and_then(|val| handler.run(temp_path, val)) - // }); - // let s = s.map(|_| ()).map_err(|_: Error| ()); - // tokio::executor::spawn(s); - // Ok("success".to_owned()) - /* - Ok(self.iter() - .fold(Ok(req), |prev, handler| { - prev.and_then(|val| { - println!("Running {}...", handler.config()); - let result = handler.run(&temp_path, val); - result - }) - }) - .map(|res| { - ( - StatusCode::ACCEPTED, - format!( - "stdout:\n{}\n\nstderr:\n{}", - res.get("stdout").and_then(|v| v.as_str()).unwrap_or(""), - res.get("stderr").and_then(|v| v.as_str()).unwrap_or(""), - ), - ) - }) - .unwrap_or_else(|err| (StatusCode::BAD_REQUEST, format!("Error: {:?}", err)))) - */ } + } }