diff --git a/Cargo.toml b/Cargo.toml index 2e32824..f655d2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "factotum" -version = "0.4.1" +version = "0.5.0-rc1" authors = ["Ed Lewis ", "Josh Beemster "] [dependencies] diff --git a/factotum-server/.gitignore b/factotum-server/.gitignore new file mode 100644 index 0000000..043807d --- /dev/null +++ b/factotum-server/.gitignore @@ -0,0 +1,18 @@ +# Compiled files +*.o +*.so +*.rlib +*.dll + +# Executables +*.exe + +# Generated by Cargo +/target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here http://doc.crates.io/guide.html#cargotoml-vs-cargolock +Cargo.lock +.factotum +.vagrant + diff --git a/factotum-server/Cargo.toml b/factotum-server/Cargo.toml new file mode 100644 index 0000000..573ad3e --- /dev/null +++ b/factotum-server/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "factotum-server" +version = "0.1.0" +authors = ["Nicholas Ung "] + +[dependencies] +log = "0.3" +log4rs = "0.6" +docopt = "0.7" +getopts = "0.2" +chrono = { version = "0.3", features = ["serde"] } +lazy_static = "0.2" +regex = "0.2" +url = "1.4" +rust-crypto = "^0.2" +threadpool = "1.3" +iron = "0.5" +router = "0.5" +bodyparser = "0.6" +persistent = "0.3" +logger = "0.3" +rustc-serialize = "0.3" +serde = "0.9" +serde_derive = "0.9" +serde_json = "0.9" +consul = "0.0.6" +base64 = "0.4" diff --git a/factotum-server/src/factotum_server/command/mod.rs b/factotum-server/src/factotum_server/command/mod.rs new file mode 100644 index 0000000..009a2d3 --- /dev/null +++ b/factotum-server/src/factotum_server/command/mod.rs @@ -0,0 +1,75 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +use std::collections::HashMap; +use std::process::Command; + +macro_rules! commands { + ($( $key: expr => $val: expr ),*) => {{ + let mut map = ::std::collections::HashMap::new(); + $( map.insert($key, $val); )* + ::factotum_server::command::CommandStore::new(map) + }} +} + +#[cfg(test)] +mod tests; + +pub trait Execution { + fn get_command(&self, command: &str) -> Result; + fn execute(&self, cmd_path: String, cmd_args: Vec) -> Result; +} + +#[derive(Clone, Debug)] +pub struct CommandStore { + pub command_map: HashMap +} + +impl CommandStore { + pub fn new(commands: HashMap) -> CommandStore { + CommandStore { + command_map: commands + } + } +} + +impl Execution for CommandStore { + fn get_command(&self, command: &str) -> Result { + match self.command_map.get(command) { + Some(command) => Ok(command.to_owned()), + None => Err(format!("Command <{}> not found in map.", command)) + } + } + + fn execute(&self, cmd_path: String, cmd_args: Vec) -> Result { + let command_str = format!("{} {}", cmd_path, cmd_args.join(" ")); + debug!("Executing: [{}]", command_str); + let failed_command_msg = format!("Failed to execute command: [{}]", command_str); + match Command::new(cmd_path) + .args(&cmd_args) + .output() + { + Ok(output) => { + if output.status.success() { + let stdout = String::from_utf8_lossy(&output.stdout); + Ok(stdout.into_owned()) + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + Err(format!("{} - {}", failed_command_msg, stderr.into_owned())) + } + } + Err(e) => Err(format!("{} - {}", failed_command_msg, e)) + } + } +} diff --git a/factotum-server/src/factotum_server/command/tests.rs b/factotum-server/src/factotum_server/command/tests.rs new file mode 100644 index 0000000..6ccb110 --- /dev/null +++ b/factotum-server/src/factotum_server/command/tests.rs @@ -0,0 +1,48 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +use super::*; + +#[test] +fn create_command_store_macro() { + let command_store = commands!["dummy".to_string() => "/tmp/fake_command".to_string()]; + assert_eq!(command_store.command_map.contains_key("dummy"), true); + assert_eq!(command_store.command_map.contains_key("other_dummy"), false); +} + +#[test] +fn command_store_get_command_success() { + let command_store = commands!["dummy".to_string() => "/tmp/fake_command".to_string()]; + assert_eq!(command_store.get_command("dummy"), Ok("/tmp/fake_command".to_string())); +} + +#[test] +fn command_store_get_command_error() { + let command_store = CommandStore::new(HashMap::new()); + assert_eq!(command_store.get_command("dummy"), Err("Command not found in map.".to_string())); +} + +#[test] +fn command_store_execute_fail() { + let command_store = commands!["dummy".to_string() => "/tmp/fake_command".to_string()]; + let output = command_store.execute("/tmp/fake_command".to_string(), vec!["--random_arg".to_string()]).unwrap_err(); + assert_eq!(output, "Failed to execute command: [/tmp/fake_command --random_arg] - No such file or directory (os error 2)"); +} + +#[test] +fn command_store_execute_illegal_option() { + let command_store = commands!["dummy".to_string() => "/tmp/fake_command".to_string()]; + let output = command_store.execute("pwd".to_string(), vec!["--random_arg".to_string()]).unwrap_err(); + assert_eq!(output, "Failed to execute command: [pwd --random_arg] - pwd: unrecognized option \'--random_arg\'\nTry \'pwd --help\' for more information.\n"); +} diff --git a/factotum-server/src/factotum_server/dispatcher/mod.rs b/factotum-server/src/factotum_server/dispatcher/mod.rs new file mode 100644 index 0000000..f128993 --- /dev/null +++ b/factotum-server/src/factotum_server/dispatcher/mod.rs @@ -0,0 +1,70 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +#[cfg(test)] +mod tests; + +use std::collections::VecDeque; +use std::sync::mpsc::Sender; +use factotum_server::server::JobRequest; +use factotum_server::responder::DispatcherStatus; + +#[derive(Debug, PartialEq)] +pub enum Dispatch { + StatusUpdate(Query), + CheckQueue(Query), + NewRequest(JobRequest), + ProcessRequest, + RequestComplete(JobRequest), + RequestFailure(JobRequest), + StopProcessing, +} + +#[derive(Debug)] +pub struct Dispatcher { + pub max_jobs: usize, + pub max_workers: usize, + pub requests_queue: VecDeque, +} + +impl Dispatcher { + pub fn new(queue_size: usize, workers_size: usize) -> Dispatcher { + Dispatcher { + max_jobs: if queue_size > 0 { queue_size } else { ::MAX_JOBS_DEFAULT }, + max_workers: if workers_size > 0 { workers_size } else { ::MAX_WORKERS_DEFAULT }, + requests_queue: VecDeque::with_capacity(queue_size), + } + } +} + +#[derive(Debug)] +pub struct Query { + pub name: String, + pub status_tx: Sender, +} + +impl Query { + pub fn new(name: String, status_tx: Sender) -> Query { + Query { + name: name, + status_tx: status_tx, + } + } +} + +impl PartialEq for Query { + fn eq(&self, other: &Query) -> bool { + self.name == other.name + } +} diff --git a/factotum-server/src/factotum_server/dispatcher/tests.rs b/factotum-server/src/factotum_server/dispatcher/tests.rs new file mode 100644 index 0000000..39d1708 --- /dev/null +++ b/factotum-server/src/factotum_server/dispatcher/tests.rs @@ -0,0 +1,24 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +use super::*; + +#[test] +fn create_new_dispatcher() { + let dispatcher = Dispatcher::new(10, 2); + + assert_eq!(dispatcher.max_jobs, 10); + assert_eq!(dispatcher.max_workers, 2); + assert!(dispatcher.requests_queue.is_empty()); +} diff --git a/factotum-server/src/factotum_server/mod.rs b/factotum-server/src/factotum_server/mod.rs new file mode 100644 index 0000000..bcce794 --- /dev/null +++ b/factotum-server/src/factotum_server/mod.rs @@ -0,0 +1,242 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +#[macro_use] +pub mod command; +pub mod server; +pub mod dispatcher; +pub mod persistence; +pub mod responder; + +#[cfg(test)] +mod tests; + +use std::collections::VecDeque; +use std::net::SocketAddr; +use std::str::FromStr; +use std::sync::{Mutex, RwLock}; +use std::sync::mpsc; +use std::sync::mpsc::{Sender, Receiver}; +use std::thread; +use std::thread::JoinHandle; +use iron::prelude::*; +use iron::typemap::Key; +use logger::Logger; +use persistent::{Read, State}; +use threadpool::ThreadPool; + +use Args; +use factotum_server::command::{CommandStore, Execution}; +use factotum_server::dispatcher::{Dispatch, Dispatcher, Query}; +use factotum_server::persistence::{Persistence, ConsulPersistence, JobState}; +use factotum_server::responder::{DispatcherStatus, JobStatus, WorkerStatus}; +use factotum_server::server::{ServerManager, JobRequest}; + +#[derive(Debug, Copy, Clone)] +pub struct Server; +impl Key for Server { + type Value = ServerManager; +} + +#[derive(Debug, Copy, Clone)] +pub struct Storage; +impl Key for Storage { + type Value = ConsulPersistence; +} + +#[derive(Debug, Copy, Clone)] +pub struct Paths; +impl Key for Paths { + type Value = RwLock; +} + +#[derive(Debug, Copy, Clone)] +pub struct Updates; +impl Key for Updates { + type Value = Mutex>; +} + +pub fn start(args: Args) { + let server = ServerManager::new(args.flag_ip, args.flag_port, args.flag_webhook, args.flag_no_colour); + let persistence = ConsulPersistence::new(args.flag_consul_name, args.flag_consul_ip, args.flag_consul_port, args.flag_consul_namespace); + let dispatcher = Dispatcher::new(args.flag_max_jobs, args.flag_max_workers); + let command_store = commands![::FACTOTUM.to_string() => args.flag_factotum_bin]; + + let address = SocketAddr::from_str(&format!("{}:{}", server.ip, server.port)).expect("Failed to parse socket address"); + + let (requests_channel, _, _) = trigger_worker_manager(dispatcher, persistence.clone(), &command_store).unwrap(); + + let router = router!( + index: get "/" => responder::api, + help: get "/help" => responder::api, + status: get "/status" => responder::status, + settings: post "/settings" => responder::settings, + submit: post "/submit" => responder::submit, + check: post "/check" => responder::check + ); + let (logger_before, logger_after) = Logger::new(None); + + let mut chain = Chain::new(router); + chain.link_before(logger_before); + chain.link(State::::both(server)); + chain.link(State::::both(persistence)); + chain.link(Read::::both(RwLock::new(command_store))); + chain.link(Read::::both(Mutex::new(requests_channel))); + chain.link_after(logger_after); + + match Iron::new(chain).http(address) { + Ok(listening) => { + let socket_addr = listening.socket; + let ip = socket_addr.ip(); + let port = socket_addr.port(); + let start_message = format!("Factotum Server version [{}] listening on [{}:{}]", ::VERSION, ip, port); + info!("{}", start_message); + println!("{}", start_message) + } + Err(e) => println!("Failed to start server - {}", e) + } +} + +// Concurrent dispatch + +pub fn trigger_worker_manager(dispatcher: Dispatcher, persistence: T, command_store: &CommandStore) -> Result<(Sender, JoinHandle, ThreadPool), String> { + let (tx, rx) = mpsc::channel(); + let primary_pool = ThreadPool::new_with_name("primary_pool".to_string(), dispatcher.max_workers); + + let join_handle = spawn_worker_manager(tx.clone(), rx, dispatcher.requests_queue, dispatcher.max_jobs, primary_pool.clone(), persistence, command_store.clone()); + + Ok((tx, join_handle, primary_pool)) +} + +fn spawn_worker_manager(job_requests_tx: Sender, job_requests_rx: Receiver, requests_queue: VecDeque, max_jobs: usize, primary_pool: ThreadPool, persistence: T, command_store: CommandStore) -> JoinHandle { + let mut requests_queue = requests_queue; + let mut is_processing = true; + thread::spawn(move || { + while is_processing { + let message = job_requests_rx.recv().expect("Error receiving message in channel"); + + match message { + Dispatch::StatusUpdate(query) => send_status_update(query, &mut requests_queue, max_jobs, primary_pool.clone()), + Dispatch::CheckQueue(query) => is_queue_full(query, &mut requests_queue, max_jobs), + Dispatch::NewRequest(request) => new_job_request(job_requests_tx.clone(), &mut requests_queue, primary_pool.clone(), persistence.clone(), request), + Dispatch::ProcessRequest => process_job_request(job_requests_tx.clone(), &mut requests_queue, primary_pool.clone(), persistence.clone(), command_store.clone()), + Dispatch::RequestComplete(request) => complete_job_request(job_requests_tx.clone(), persistence.clone(), request), + Dispatch::RequestFailure(request) => failed_job_request(job_requests_tx.clone(), persistence.clone(), request), + Dispatch::StopProcessing => is_processing = stop_processing(), + } + } + String::from("EXITING WORKER MANAGER") + }) +} + +fn send_status_update(query: Query, requests_queue: &mut VecDeque, max_jobs: usize, primary_pool: ThreadPool) { + let tx = query.status_tx; + let total_workers = primary_pool.max_count(); + let active_workers = primary_pool.active_count(); + let result = DispatcherStatus { + workers: WorkerStatus { + total: total_workers, + idle: total_workers - active_workers, + active: active_workers, + }, + jobs: JobStatus { + max_queue_size: max_jobs, + in_queue: requests_queue.len(), + fail_count: 0, + success_count: 0 + } + }; + tx.send(result).expect("Server status channel receiver has been deallocated"); +} + +fn is_queue_full(query: Query, requests_queue: &mut VecDeque, max_jobs: usize) { + let tx = query.status_tx; + let is_full = requests_queue.len() >= max_jobs; + tx.send(is_full).expect("Queue query channel receiver has been deallocated"); +} + +fn new_job_request(requests_channel: Sender, requests_queue: &mut VecDeque, primary_pool: ThreadPool, persistence: T, request: JobRequest) { + debug!("ADDING NEW JOB jobId:[{}]", request.job_id); + requests_queue.push_back(request.clone()); + // Create entry in persistence storage + persist_entry(&persistence, request.job_id.clone(), request.clone(), JobState::Queued); + // Check queue size - return error if limit exceeded (not important right now) + if primary_pool.active_count() < primary_pool.max_count() { + requests_channel.send(Dispatch::ProcessRequest).expect("Job requests channel receiver has been deallocated"); + } else { + info!("No threads available - waiting for a job to complete.") + } +} + +fn process_job_request(requests_channel: Sender, requests_queue: &mut VecDeque, primary_pool: ThreadPool, persistence: T, command_store: CommandStore) { + debug!("QUEUE SIZE = {}", requests_queue.len()); + match requests_queue.pop_front() { + Some(request) => { + primary_pool.execute(move || { + debug!("PROCESSING JOB REQ jobId:[{}]", request.job_id); + // Update status in persistence storage + persist_entry(&persistence, request.job_id.clone(), request.clone(), JobState::Working); + let cmd_path = match command_store.get_command(::FACTOTUM) { + Ok(path) => path, + Err(e) => { + error!("{}", e); + requests_channel.send(Dispatch::RequestFailure(request)).expect("Job requests channel receiver has been deallocated"); + return + } + }; + let mut cmd_args = vec!["run".to_string(), request.factfile_path.clone()]; + cmd_args.extend_from_slice(request.factfile_args.as_slice()); + match command_store.execute(cmd_path, cmd_args) { + Ok(output) => { + trace!("{}", output); + requests_channel.send(Dispatch::RequestComplete(request)).expect("Job requests channel receiver has been deallocated"); + }, + Err(e) => { + error!("{}", e); + requests_channel.send(Dispatch::RequestFailure(request)).expect("Job requests channel receiver has been deallocated"); + } + }; + }); + } + None => debug!("QUEUE EMPTY") + } +} + +fn complete_job_request(requests_channel: Sender, persistence: T, request: JobRequest) { + info!("COMPLETED JOB REQ jobId:[{}]", request.job_id); + // Update completion in persistence storage + persist_entry(&persistence, request.job_id.clone(), request, JobState::Done); + requests_channel.send(Dispatch::ProcessRequest).expect("Job requests channel receiver has been deallocated"); +} + +fn failed_job_request(requests_channel: Sender, persistence: T, request: JobRequest) { + error!("FAILED JOB REQ jobId:[{}]", request.job_id); + // Update failure in persistence storage + persist_entry(&persistence, request.job_id.clone(), request, JobState::Done); + requests_channel.send(Dispatch::ProcessRequest).expect("Job requests channel receiver has been deallocated"); +} + +fn stop_processing() -> bool { + info!("STOPPING"); + false +} + +fn persist_entry(persistence: &T, client_job_id: String, job_request: JobRequest, job_state: JobState) { + let output = persistence::set_entry(persistence, client_job_id.clone(), job_request, job_state.clone()); + if output { + debug!("Persist [{}]::[{}]", client_job_id, job_state); + } else { + error!("Persistence Error: Failed to update [{}] to [{}]", client_job_id, job_state); + } +} diff --git a/factotum-server/src/factotum_server/persistence/mod.rs b/factotum-server/src/factotum_server/persistence/mod.rs new file mode 100644 index 0000000..7d83963 --- /dev/null +++ b/factotum-server/src/factotum_server/persistence/mod.rs @@ -0,0 +1,149 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +use std::fmt; +use std::panic; +use std::thread::Result as ThreadResult; +use consul::Client; +use serde_json; +use base64::decode; + +use factotum_server::server::JobRequest; + +#[cfg(test)] +mod tests; + +pub trait Persistence { + fn id(&self) -> &str; + fn set_key(&self, key: &str, value: &str) -> ThreadResult<()>; + fn get_key(&self, key: &str) -> ThreadResult>; + fn prepend_namespace(&self, key: &str) -> String; +} + +#[derive(Clone, Debug)] +pub struct ConsulPersistence { + server_id: String, + host: String, + port: u32, + namespace: String, +} + +impl ConsulPersistence { + pub fn new(wrapped_id: Option, wrapped_host: Option, wrapped_port: Option, wrapped_namespace: Option) -> ConsulPersistence { + ConsulPersistence { + server_id: if let Some(server_id) = wrapped_id { server_id } else { ::CONSUL_NAME_DEFAULT.to_string() }, + host: if let Some(host) = wrapped_host { host } else { ::CONSUL_IP_DEFAULT.to_string() }, + port: if let Some(port) = wrapped_port { port } else { ::CONSUL_PORT_DEFAULT }, + namespace: if let Some(namespace) = wrapped_namespace { namespace } else { ::CONSUL_NAMESPACE_DEFAULT.to_string() }, + } + } + + fn client(&self) -> Client { + Client::new(&format!("{}:{}", self.host.clone(), self.port.clone())) + } +} + +impl Persistence for ConsulPersistence { + fn id(&self) -> &str { + &self.server_id + } + + fn set_key(&self, key: &str, value: &str) -> ThreadResult<()> { + panic::catch_unwind(|| { + self.client().keystore.set_key(key.to_owned(), value.to_owned()) + }) + } + + fn get_key(&self, key: &str) -> ThreadResult> { + panic::catch_unwind(|| { + self.client().keystore.get_key(key.to_owned()) + }) + } + + fn prepend_namespace(&self, job_ref: &str) -> String { + format!("{}/{}", &self.namespace, job_ref) + } +} + +pub fn set_entry(persistence: &T, job_ref: String, job_request: JobRequest, state: JobState) -> bool +{ + let job_entry = JobEntry::new(state, job_request, persistence.id()); + let job_entry_json = serde_json::to_string(&job_entry).expect("JSON compact encode error"); + + let job_key = persistence.prepend_namespace(&job_ref); + let result = persistence.set_key(&job_key, &job_entry_json); + + match result { + Ok(_) => true, + Err(_) => { + error!("Persistence Error: could not set K/V: {}::{}", job_key, job_entry_json); + false + }, + } +} + +pub fn get_entry(persistence: &T, job_ref: String) -> Option { + let job_key = persistence.prepend_namespace(&job_ref); + let result = persistence.get_key(&job_key); + + let keystore_val = match result { + Ok(state) => state, + Err(_) => { + error!("Persistence Error: could not get key: {}", job_ref); + None + }, + }; + + // decode base64 string + // deserialize to JobEntry + if let Some(base64_str) = keystore_val { + let decode_result = &decode(&base64_str).expect("Base64 string decode error"); + let raw_value = ::std::str::from_utf8(decode_result).expect("Error converting from bytes to string"); + let job_entry: JobEntry = serde_json::from_str(raw_value).expect("JSON decode error"); + Some(job_entry) + } else { + None + } +} + +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub enum JobState { + Queued, + Working, + Done, +} + +impl fmt::Display for JobState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct JobEntry { + pub state: JobState, + pub job_request: JobRequest, + pub last_run_from: String, +} + +impl JobEntry { + fn new(state: JobState, request: JobRequest, server_id: &str) -> JobEntry { + JobEntry { + state: state, + job_request: request, + last_run_from: server_id.to_owned(), + } + } +} diff --git a/factotum-server/src/factotum_server/persistence/tests.rs b/factotum-server/src/factotum_server/persistence/tests.rs new file mode 100644 index 0000000..c4e41b6 --- /dev/null +++ b/factotum-server/src/factotum_server/persistence/tests.rs @@ -0,0 +1,132 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +use super::*; +use std::cell::RefCell; +use std::collections::HashMap; + +#[derive(Debug)] +struct GoodPersistenceMock { + id: String, + ref_map: RefCell>, +} + +impl GoodPersistenceMock { + fn new(id: &str) -> Self { + GoodPersistenceMock { + id: id.to_owned(), + ref_map: RefCell::new(HashMap::new()), + } + } +} + +impl Persistence for GoodPersistenceMock { + fn id(&self) -> &str { + &self.id + } + + fn set_key(&self, key: &str, value: &str) -> ThreadResult<()> { + let mut map = self.ref_map.borrow_mut(); + map.insert(key.to_owned(), value.to_owned()); + Ok(()) + } + + fn get_key(&self, key: &str) -> ThreadResult> { + let map = self.ref_map.borrow(); + let value = map.get(key); + Ok(value.map(|s| s.to_owned())) + } + + fn prepend_namespace(&self, key: &str) -> String { + format!("{}/{}", "com.test/namespace", key) + } +} + +#[derive(Debug)] +struct BadPersistenceMock; + +impl Persistence for BadPersistenceMock { + fn id(&self) -> &str { + "something_bad" + } + + fn set_key(&self, _: &str, _: &str) -> ThreadResult<()> { + Err(Box::new("setting key bad")) + } + + fn get_key(&self, _: &str) -> ThreadResult> { + Err(Box::new("getting key bad")) + } + + fn prepend_namespace(&self, key: &str) -> String { + key.to_string() + } +} + +#[test] +fn set_entry_fail_error() { + let persistence = BadPersistenceMock; + let request = JobRequest::new("", "dummy", "/fake/path", vec![]); + + let result = set_entry(&persistence, "fake_entry".to_string(), request.clone(), JobState::Queued); + + assert_eq!(false, result); +} + +#[test] +fn set_entry_new_success() { + let persistence = GoodPersistenceMock::new("test_set"); + let request = JobRequest::new("", "dummy", "/fake/path", vec![]); + + let result = set_entry(&persistence, "fake_entry".to_string(), request.clone(), JobState::Queued); + + let borrowed = &persistence.ref_map.borrow(); + let entry = borrowed.get("com.test/namespace/fake_entry").unwrap(); + let job_entry: JobEntry = serde_json::from_str(entry).expect("JSON decode error"); + + assert_eq!(true, result); + assert_eq!(JobState::Queued, job_entry.state); + assert_eq!("test_set".to_string(), job_entry.last_run_from); + assert_eq!(request, job_entry.job_request); +} + +#[test] +fn get_entry_fail_none() { + let persistence = BadPersistenceMock; + + let result = get_entry(&persistence, "fake_entry".to_string()); + + assert_eq!(None, result); +} + +#[test] +fn get_entry_success_key() { + use base64::encode; + + let persistence = GoodPersistenceMock::new("test_get"); + let request = JobRequest::new("", "dummy", "/fake/path", vec![]); + let job_entry = JobEntry::new(JobState::Queued, request.clone(), persistence.id()); + let job_entry_json = serde_json::to_string(&job_entry).expect("JSON compact encode error"); + let encoded_entry = encode(job_entry_json.as_bytes()); + { + let mut map = persistence.ref_map.borrow_mut(); + map.insert("com.test/namespace/dummy_entry".to_string(), encoded_entry); + } + + let result = get_entry(&persistence, "dummy_entry".to_string()).unwrap(); + + assert_eq!(JobState::Queued, result.state); + assert_eq!("test_get".to_string(), result.last_run_from); + assert_eq!(request, result.job_request); +} diff --git a/factotum-server/src/factotum_server/responder/mod.rs b/factotum-server/src/factotum_server/responder/mod.rs new file mode 100644 index 0000000..346a607 --- /dev/null +++ b/factotum-server/src/factotum_server/responder/mod.rs @@ -0,0 +1,338 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +use std::collections::HashMap; +use std::error::Error; +use std::ops::{Deref, DerefMut}; +use std::sync::mpsc; +use std::sync::mpsc::Sender; +use iron::mime::*; +use iron::prelude::*; +use iron::status; +use iron::status::Status; +use url::Url; +use bodyparser; +use persistent::{Read, State}; +use serde::Serialize; +use serde_json; + +use factotum_server::{Paths, Server, Storage, Updates}; +use factotum_server::command::Execution; +use factotum_server::dispatcher::{Dispatch, Query}; +use factotum_server::persistence; +use factotum_server::persistence::{Persistence, JobState}; +use factotum_server::server::{ServerManager, SettingsRequest, JobRequest}; + +#[cfg(test)] +mod tests; + +const JSON_CONTENT_TYPE: &'static str = "application/json; charset=UTF-8"; + +// JSON Response Structs + +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +struct ResponseMessage { + message: String +} + +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +struct FactotumServerStatus { + version: VersionStatus, + server: ServerStatus, + dispatcher: DispatcherStatus, +} + +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +struct VersionStatus { + executor: String +} + +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +struct ServerStatus { + start_time: String, + up_time: String, + state: String +} + +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DispatcherStatus { + pub workers: WorkerStatus, + pub jobs: JobStatus, +} + +#[derive(Debug,PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkerStatus { + pub total: usize, + pub idle: usize, + pub active: usize, +} + +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct JobStatus { + pub max_queue_size: usize, + pub in_queue: usize, + pub fail_count: usize, + pub success_count: usize, +} + +// Response handlers + +pub fn api(request: &mut Request) -> IronResult { + let url: Url = request.url.clone().into(); + let response = get_help_message(); + return_json(status::Ok, encode(url, response)) +} + +pub fn status(request: &mut Request) -> IronResult { + let url: Url = request.url.clone().into(); + let rwlock = request.get::>().unwrap(); + let reader = rwlock.read().unwrap(); + let server_manager = reader.deref(); + let mutex = request.get::>().unwrap(); + let jobs_channel = mutex.try_lock().unwrap(); + + let response = get_server_status(server_manager, jobs_channel.clone()); + return_json(status::Ok, encode(url, response)) +} + +pub fn settings(request: &mut Request) -> IronResult { + let url: Url = request.url.clone().into(); + let request_body = request.get::>(); + let rwlock = request.get::>().unwrap(); + let mut server = rwlock.write().unwrap(); + + let (status, response) = process_settings(url, request_body, server.deref_mut()); + return_json(status, response) +} + +pub fn submit(request: &mut Request) -> IronResult { + let url: Url = request.url.clone().into(); + let request_body = request.get::>(); + let server_rwlock = request.get::>().unwrap(); + let server = server_rwlock.write().unwrap(); + let storage_rwlock = request.get::>().unwrap(); + let persistence = storage_rwlock.write().unwrap(); + let command_store_rwlock = request.get::>().unwrap(); + let command_store = command_store_rwlock.read().unwrap(); + let sender_mutex = request.get::>().unwrap(); + let jobs_channel = sender_mutex.try_lock().unwrap(); + + let (status, response) = process_submission(url, request_body, server.deref(), persistence.deref(), command_store.deref(), jobs_channel.deref()); + return_json(status, response) +} + +pub fn check(request: &mut Request) -> IronResult { + let url: Url = request.url.clone().into(); + let response = ResponseMessage { message: "check".to_string() }; + return_json(status::Ok, encode(url, &response)) +} + +// Helpers + +fn get_help_message() -> serde_json::Value { + json!( + { + "/help": { + "function": "Returns this message!", + "params": "pretty=1" + }, + "/status": { + "function": "Returns general information about the server and host system.", + "params": "pretty=1" + }, + "/settings": { + "function": "Updates settings within the server.", + "body": { + "state": "run|drain" + }, + "params": "pretty=1" + }, + "/submit": { + "function": "Submits a job to the queue.", + "body": { + "jobName": "com.acme-main", + "factfilePath": "/com.acme-main/factfile", + "factfileArgs": "[ --start step-2 ]" + }, + "params": "pretty=1" + }, + "/check": { + "function": "Fetches the state of a job by the ID.", + "params": "pretty=1, id=[id string]" + } + } + ) +} + +fn get_server_status(server: &ServerManager, jobs_channel: Sender) -> FactotumServerStatus { + let (tx, rx) = mpsc::channel(); + jobs_channel.send(Dispatch::StatusUpdate(Query::new("status_query".to_string(), tx))).expect("Job requests channel receiver has been deallocated"); + let dispatcher_status = rx.recv().expect("Server status senders have been disconnected"); + + FactotumServerStatus { + version: VersionStatus { + executor: ::VERSION.to_string() + }, + server: ServerStatus { + start_time: server.get_start_time(), + up_time: server.get_uptime(), + state: server.state.to_string() + }, + dispatcher: dispatcher_status, + } +} + +fn process_settings(url: Url, request_body: Result, bodyparser::BodyError>, server: &mut ServerManager) -> (Status, String) { + // get body + let settings = match request_body { + Ok(Some(decoded_settings)) => decoded_settings, + Ok(None) => { + return (status::BadRequest, create_warn_response(url, "Error: No body found in POST request")) + }, + Err(e) => { + return (status::BadRequest, create_warn_response(url, &format!("Error decoding JSON string: {}", e.cause().unwrap()))) + } + }; + + // validate settings request + let validated_settings = match SettingsRequest::validate(settings) { + Ok(validated_settings) => validated_settings, + Err(e) => { + return (status::BadRequest, create_warn_response(url, &format!("{}", e))) + } + }; + + // update server state + server.state = validated_settings.state.to_string(); + (status::Ok, create_ok_response(url, &format!("Update acknowledged: [state: {}]", server.state))) +} + +fn process_submission(url: Url, request_body: Result, bodyparser::BodyError>, server: &ServerManager, persistence: &T, command_store: &U, jobs_channel: &Sender) -> (Status, String) where + T: Persistence, + U: Execution { + // get body + let job_request = match request_body { + Ok(Some(decoded_job_request)) => decoded_job_request, + Ok(None) => { + return (status::BadRequest, create_warn_response(url, "Error: No body found in POST request")) + }, + Err(e) => { + return (status::BadRequest, create_warn_response(url, &format!("Error decoding JSON string: {}", e.cause().unwrap()))) + } + }; + + // check state + if !server.is_running() { + return (status::BadRequest, create_warn_response(url, &format!("Server in [{}] state - cannot submit job", server.state))) + } + + // validate job request + let mut validated_job_request = match JobRequest::validate(job_request, command_store) { + Ok(validated_job_request) => validated_job_request, + Err(e) => { + return (status::BadRequest, create_warn_response(url, &format!("{}", e))) + } + }; + + if job_will_be_run(persistence, &mut validated_job_request) { + return (status::BadRequest, create_warn_response(url, "Job has already been run")) + } + + // check queue size + if is_requests_queue_full(jobs_channel.clone()) { + return (status::BadRequest, create_warn_response(url, "Queue is full, cannot add job")) + } + + // append args + JobRequest::append_job_args(&server.deref(), &mut validated_job_request); + let job_id = validated_job_request.job_id.clone(); + jobs_channel.send(Dispatch::NewRequest(validated_job_request)).expect("Job requests channel receiver has been deallocated"); + (status::Ok, create_ok_response(url, &format!("SUBMITTING JOB REQ jobId:[{}]", job_id))) +} + +fn job_will_be_run(persistence: &T, job_request: &mut JobRequest) -> bool { + // check if job is running + // if NONE -> continue + // elif SOME && state == done -> continue + // else FAIL + let mut is_running = false; + match persistence::get_entry(persistence, job_request.job_id.clone()) { + Some(job_entry) => { + debug!("Job Entry '{}' state='{}'", job_entry.job_request.job_id, job_entry.state); + if job_entry.state != JobState::Done { + is_running = true; + } + }, + None => { + debug!("No state found for Job Entry '{}'", job_request.job_id); + }, + }; + is_running +} + +fn is_requests_queue_full(jobs_channel: Sender) -> bool { + let (tx, rx) = mpsc::channel(); + jobs_channel.send(Dispatch::CheckQueue(Query::new("queue_query".to_string(), tx))).expect("Job requests channel receiver has been deallocated"); + rx.recv().expect("Queue query senders have been disconnected") +} + +fn get_query_map(url: Url) -> HashMap { + let parser = url.query_pairs().into_owned(); + parser.collect() +} + +fn encode_compact(message: T) -> String { + serde_json::to_string(&message).expect("JSON compact encode error") +} + +fn encode_pretty(message: T) -> String { + serde_json::to_string_pretty(&message).expect("JSON pretty encode error") +} + +fn encode(url: Url, message: T) -> String { + let query_map = get_query_map(url); + if let Some(pretty) = query_map.get("pretty") { + if pretty == "1" { + return encode_pretty(message) + } + } + encode_compact(message) +} + +fn create_response(url: Url, message: &str) -> String { + let response = ResponseMessage { message: message.to_string() }; + encode(url, &response) +} + +fn create_ok_response(url: Url, message: &str) -> String { + info!("{}", message); + create_response(url, message) +} + +fn create_warn_response(url: Url, message: &str) -> String { + warn!("{}", message); + create_response(url, message) +} + +fn return_json(code: Status, response: String) -> IronResult { + let content_type = JSON_CONTENT_TYPE.parse::().unwrap(); + Ok(Response::with((content_type, code, response))) +} diff --git a/factotum-server/src/factotum_server/responder/tests.rs b/factotum-server/src/factotum_server/responder/tests.rs new file mode 100644 index 0000000..6ff3d15 --- /dev/null +++ b/factotum-server/src/factotum_server/responder/tests.rs @@ -0,0 +1,165 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +use super::*; +use factotum_server::persistence::ConsulPersistence; +use factotum_server::command::Execution; + +#[test] +fn process_settings_fail_no_body() { + let url = Url::parse("http://not.a.real.address/").unwrap(); + let request_body = Ok(None); + let mut server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + + let (status, response) = process_settings(url, request_body, &mut server_manager); + + assert_eq!(status, status::BadRequest); + assert_eq!(response, r#"{"message":"Error: No body found in POST request"}"#); +} + +#[test] +fn process_settings_fail_invalid_json() { + let url = Url::parse("http://not.a.real.address/").unwrap(); + let request_body = Err(bodyparser::BodyError{ + detail: "dummy error".to_string(), + cause: bodyparser::BodyErrorCause::IoError(::std::io::Error::new(::std::io::ErrorKind::Other, "bad stuff")), + }); + let mut server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + + let (status, response) = process_settings(url, request_body, &mut server_manager); + + assert_eq!(status, status::BadRequest); + assert_eq!(response, r#"{"message":"Error decoding JSON string: bad stuff"}"#); +} + +#[test] +fn process_settings_fail_invalid_settings_request() { + let url = Url::parse("http://not.a.real.address/").unwrap(); + let request_body = Ok(Some(SettingsRequest::new("INVALID"))); + let mut server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + + let (status, response) = process_settings(url, request_body, &mut server_manager); + + assert_eq!(status, status::BadRequest); + assert_eq!(response, r#"{"message":"Validation Error: Invalid 'state', must be one of (run|drain)"}"#); +} + +#[test] +fn process_settings_success() { + let url = Url::parse("http://not.a.real.address/").unwrap(); + let request_body = Ok(Some(SettingsRequest::new("drain"))); + let mut server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + + assert_eq!(server_manager.state, ::SERVER_STATE_RUN); + + let (status, response) = process_settings(url, request_body, &mut server_manager); + + assert_eq!(server_manager.state, ::SERVER_STATE_DRAIN); + assert_eq!(status, status::Ok); + assert_eq!(response, r#"{"message":"Update acknowledged: [state: drain]"}"#); +} + +#[test] +fn process_submission_fail_no_body() { + let url = Url::parse("http://not.a.real.address/").unwrap(); + let request_body = Ok(None); + let server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + let persistence = ConsulPersistence::new(None, None, None, None); + let command_store = commands![::FACTOTUM.to_string() => "/tmp/fake_command".to_string()]; + let (tx, _) = mpsc::channel(); + + let (status, response) = process_submission(url, request_body, &server_manager, &persistence, &command_store, &tx); + + assert_eq!(status, status::BadRequest); + assert_eq!(response, r#"{"message":"Error: No body found in POST request"}"#); +} + +#[test] +fn process_submission_fail_invalid_json() { + let url = Url::parse("http://not.a.real.address/").unwrap(); + let request_body = Err(bodyparser::BodyError{ + detail: "dummy error".to_string(), + cause: bodyparser::BodyErrorCause::IoError(::std::io::Error::new(::std::io::ErrorKind::Other, "bad stuff")), + }); + let server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + let persistence = ConsulPersistence::new(None, None, None, None); + let command_store = commands![::FACTOTUM.to_string() => "/tmp/fake_command".to_string()]; + let (tx, _) = mpsc::channel(); + + let (status, response) = process_submission(url, request_body, &server_manager, &persistence, &command_store, &tx); + + assert_eq!(status, status::BadRequest); + assert_eq!(response, r#"{"message":"Error decoding JSON string: bad stuff"}"#); +} + +#[test] +fn process_submission_fail_server_in_drain_state() { + let url = Url::parse("http://not.a.real.address/").unwrap(); + let request_body = Ok(Some(JobRequest::new("1", "dummy", "/tmp/somewhere", vec!["--first-arg".to_string()]))); + let mut server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + let persistence = ConsulPersistence::new(None, None, None, None); + let command_store = commands![::FACTOTUM.to_string() => "/tmp/fake_command".to_string()]; + let (tx, _) = mpsc::channel(); + + server_manager.state = ::SERVER_STATE_DRAIN.to_string(); + let (status, response) = process_submission(url, request_body, &server_manager, &persistence, &command_store, &tx); + + assert_eq!(status, status::BadRequest); + assert_eq!(response, r#"{"message":"Server in [drain] state - cannot submit job"}"#); +} + +#[test] +fn process_submission_fail_invalid_job_request() { + let url = Url::parse("http://not.a.real.address/").unwrap(); + let request_body = Ok(Some(JobRequest::new("1", "", "/tmp/somewhere", vec!["--first-arg".to_string()]))); + let server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + let persistence = ConsulPersistence::new(None, None, None, None); + let command_store = commands![::FACTOTUM.to_string() => "/tmp/fake_command".to_string()]; + let (tx, _) = mpsc::channel(); + + let (status, response) = process_submission(url, request_body, &server_manager, &persistence, &command_store, &tx); + + assert_eq!(status, status::BadRequest); + assert_eq!(response, r#"{"message":"Validation Error: No valid value found: field 'jobName' cannot be empty"}"#); +} + +#[derive(Debug)] +struct NoopCommandMock; + +impl Execution for NoopCommandMock { + fn get_command(&self, _: &str) -> Result { + Ok("/noop/command".to_string()) + } + + fn execute(&self, _: String, _: Vec) -> Result { + Ok("NOOP command".to_string()) + } +} + +#[test] +#[ignore] +// Not able to test yet +fn process_submission_fail_job_already_run() { + let url = Url::parse("http://not.a.real.address/").unwrap(); + let request_body = Ok(Some(JobRequest::new("1", "dummy", "/tmp", vec!["--no-colour".to_string()]))); + let server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + let persistence = ConsulPersistence::new(None, None, None, None); + let noop_command = NoopCommandMock; + let (tx, rx) = mpsc::channel(); + + let (status, response) = process_submission(url, request_body, &server_manager, &persistence, &noop_command, &tx); + + assert_eq!(status, status::BadRequest); + assert_eq!(response, r#"{"message":"Job has already been run"}"#); +} \ No newline at end of file diff --git a/factotum-server/src/factotum_server/server/mod.rs b/factotum-server/src/factotum_server/server/mod.rs new file mode 100644 index 0000000..c8200c5 --- /dev/null +++ b/factotum-server/src/factotum_server/server/mod.rs @@ -0,0 +1,297 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +use std::collections::HashMap; +use std::error; +use std::fmt; +use std::fs::File; +use std::io::Read; +use std::path::Path; +use chrono::{DateTime, UTC}; +use crypto::digest::Digest; +use crypto::sha2::Sha256; +use getopts::Options; +use serde_json; + +use factotum_server::command::Execution; + +#[cfg(test)] +mod tests; + +#[derive(Debug)] +pub struct ServerManager { + pub ip: String, + pub port: u32, + pub state: String, + pub start_time: DateTime, + pub webhook_uri: String, + pub no_colour: bool, +} + +impl ServerManager { + pub fn new(wrapped_ip: Option, port: u32, webhook_uri: String, no_colour: bool) -> ServerManager { + ServerManager { + ip: if let Some(ip) = wrapped_ip { ip } else { ::IP_DEFAULT.to_string() }, + port: if port > 0 && port <= 65535 { port } else { ::PORT_DEFAULT }, + state: ::SERVER_STATE_RUN.to_string(), + start_time: UTC::now(), + webhook_uri: webhook_uri.to_string(), + no_colour: no_colour, + } + } + + pub fn is_running(&self) -> bool { + self.state == ::SERVER_STATE_RUN + } + + pub fn get_start_time(&self) -> String { + self.start_time.format("%F %T %Z").to_string() + } + + pub fn get_uptime(&self) -> String { + let uptime = UTC::now().signed_duration_since(self.start_time); + let seconds = uptime.num_seconds() % 60; + let minutes = uptime.num_minutes() % 60; + let hours = uptime.num_hours() % 24; + let days = uptime.num_days(); + format!("{} Days, {} Hours, {} Minutes, {} Seconds", days, hours, minutes, seconds) + } +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct JobRequest { + #[serde(skip_deserializing)] + pub job_id: String, + pub job_name: String, + pub factfile_path: String, + pub factfile_args: Vec +} + +impl JobRequest { + pub fn new(job_id: &str, job_name: &str, factfile_path: &str, factfile_args: Vec) -> JobRequest { + JobRequest { + job_id: job_id.to_owned(), + job_name: job_name.to_owned(), + factfile_path: factfile_path.to_owned(), + factfile_args: factfile_args, + } + } + + pub fn validate(request: JobRequest, command_store: &U) -> Result { + // check job name not empty + // check factfile path not empty + // check factfile args not empty + if request.job_name == "" { + let message = "No valid value found: field 'jobName' cannot be empty".to_string(); + error!("{}", message); + return Err(ValidationError::no_output(message)) + } else if request.factfile_path == "" { + let message = "No valid value found: field 'factfilePath' cannot be empty".to_string(); + error!("{}", message); + return Err(ValidationError::no_output(message)) + } + // check valid factfile path exists + if !Path::new(&request.factfile_path).exists() { + let message = format!("Value does not exist on host for 'factfilePath':'{}'", request.factfile_path); + error!("{}", message); + return Err(ValidationError::no_output(message)) + } + // attempt dry run + let cmd_path = try!(command_store.get_command(::FACTOTUM)); + let mut cmd_args = vec!["run".to_string(), request.factfile_path.clone(), "--dry-run".to_string()]; + cmd_args.extend_from_slice(request.factfile_args.as_slice()); + match command_store.execute(cmd_path, cmd_args) { + Ok(_) => { + debug!("Dry run success"); + }, + Err(e) => { + error!("{}", e); + return Err(ValidationError::no_output(e)) + } + } + // generate unique job id + let mut request = request; + let tags = match extract_tags(&request.factfile_args) { + Ok(extracted) => extracted, + Err(e) => { + error!("{}", e); + return Err(ValidationError::no_output(e)) + } + }; + request.job_id = match generate_id(&request.factfile_path, tags) { + Ok(id) => id, + Err(e) => { + error!("{}", e); + return Err(ValidationError::no_output(e)) + } + }; + Ok(request) + } + + pub fn append_job_args(server: &ServerManager, job: &mut JobRequest) { + if server.webhook_uri != "" { + job.factfile_args.push("--webhook".to_string()); + job.factfile_args.push(server.webhook_uri.clone()); + } + if server.no_colour { + job.factfile_args.push("--no-colour".to_string()); + } + } +} + +impl PartialEq for JobRequest { + fn eq(&self, other: &JobRequest) -> bool { + self.job_id == other.job_id && + self.job_name == other.job_name && + self.factfile_path == other.factfile_path && + self.factfile_args == other.factfile_args + } +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SettingsRequest { + pub state: String +} + +impl PartialEq for SettingsRequest { + fn eq(&self, other: &SettingsRequest) -> bool { + self.state == other.state + } +} + +impl SettingsRequest { + pub fn new(state: &str) -> SettingsRequest { + SettingsRequest { + state: state.to_owned() + } + } + + pub fn validate(request: SettingsRequest) -> Result { + match request.state.as_ref() { + ::SERVER_STATE_RUN | ::SERVER_STATE_DRAIN => Ok(request), + _ => Err(ValidationError::no_output(format!("Invalid 'state', must be one of ({}|{})", ::SERVER_STATE_RUN, ::SERVER_STATE_DRAIN))) + } + } +} + +#[derive(Debug, PartialEq)] +pub struct ValidationError { + pub error: String, + pub stdout: String, + pub stderr: String +} + +impl ValidationError { + pub fn new(error: String, stdout: String, stderr: String) -> ValidationError { + ValidationError { + error: error, + stdout: stdout, + stderr: stderr, + } + } + + pub fn no_output(error: String) -> ValidationError { + ValidationError::new(error, String::new(), String::new()) + } +} + +impl fmt::Display for ValidationError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Validation Error: {}", self.error) + } +} + +impl error::Error for ValidationError { + fn description(&self) -> &str { + &self.error + } +} + +impl From for ValidationError { + fn from(err: String) -> ValidationError { + ValidationError::no_output(err) + } +} + +fn generate_id(factfile: &str, tags: Option>) -> Result { + let mut fh = try!(File::open(factfile) + .map_err(|e| format!("Could not open '{}' for reading: {}", factfile, e))); + let mut file = String::new(); + try!(fh.read_to_string(&mut file).map_err(|e| format!("Could not read '{}': {}", factfile, e))); + let schema: serde_json::Value = try!(serde_json::from_str(&file).map_err(|e| e.to_string())); + let ff: String = try!(serde_json::to_string(&schema).map_err(|e| e.to_string())); + let mut job_digest = Sha256::new(); + job_digest.input_str(&ff); + + if let Some(ref tags_map) = tags { + let mut sorted_keys:Vec<_> = tags_map.keys().collect(); + sorted_keys.sort(); + for key in sorted_keys { + job_digest.input_str(key); + job_digest.input_str(&tags_map[key]); + } + } + + Ok(job_digest.result_str()) +} + +fn extract_tags(factfile_args: &Vec) -> Result>, String> { + let mut opts = Options::new(); + opts.optmulti("", "tag", "Add Factotum job metadata (tags).", "TAG"); + opts.optmulti("", "constraint", "Checks for an external constraint that will prevent execution; allowed constraints (host).", "CONSTRAINT"); + opts.optopt("", "start", "Begin at specified task.", "TASK"); + opts.optopt("", "env", "Supply JSON to define mustache variables in Factfile.", "ENV"); + opts.optopt("", "webhook", "Post updates on job execution to the specified URL.", "URL"); + opts.optopt("", "output", "File to print output to. Used with `dot`.", "FILE"); + opts.optflag("", "dry-run", "Pretend to execute a Factfile, showing the commands that would be executed. Can be used with other options."); + opts.optflag("", "no-colour", "Turn off ANSI terminal colours/formatting in output."); + opts.optflag("", "overwrite", "Overwrite the output file if it exists."); + + let matches = match opts.parse(factfile_args) { + Ok(m) => m, + Err(e) => { + return Err(format!("Error parsing tags in factfile args: {:?}", e)) + }, + }; + + if matches.opt_present("tag") { + let tag_map = get_tag_map(&matches.opt_strs("tag")); + trace!("Extracted tags: {:?}", tag_map); + Ok(Some(tag_map)) + } else { + trace!("NO TAGS FOUND"); + Ok(None) + } +} + +fn get_tag_map(args: &Vec) -> HashMap { + let mut arg_map: HashMap = HashMap::new(); + + for arg in args.iter() { + let split = arg.split(",").collect::>(); + if split.len() >= 2 && split[0].trim().is_empty() == false { + let key = split[0].trim().to_string(); + let value = split[1..].join("").trim().to_string(); + arg_map.insert(key, value); + } else if split.len() == 1 && split[0].trim().is_empty() == false { + let key = split[0].trim().to_string(); + let value = "".to_string(); + arg_map.insert(key, value); + } + } + + arg_map +} diff --git a/factotum-server/src/factotum_server/server/tests.rs b/factotum-server/src/factotum_server/server/tests.rs new file mode 100644 index 0000000..4a83644 --- /dev/null +++ b/factotum-server/src/factotum_server/server/tests.rs @@ -0,0 +1,100 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +use super::*; +use std::error::Error; +use regex::Regex; + +#[test] +fn create_new_server_manager() { + let server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://a.webhook.com/".to_string(), true); + + assert_eq!(server_manager.ip, "0.0.0.0"); + assert_eq!(server_manager.port, 8080); + assert_eq!(server_manager.state, ::SERVER_STATE_RUN); + assert_eq!(server_manager.start_time.date(), UTC::today()); + assert_eq!(server_manager.webhook_uri, "http://a.webhook.com/"); + assert!(server_manager.no_colour); +} + +#[test] +fn server_manager_is_running() { + let server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + assert!(server_manager.is_running()); +} + +#[test] +fn server_manager_is_not_running() { + let mut server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + server_manager.state = ::SERVER_STATE_DRAIN.to_string(); + assert_eq!(server_manager.is_running(), false); +} + +#[test] +fn server_manager_get_start_time() { + let server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + assert_eq!(server_manager.get_start_time(), UTC::now().format("%F %T %Z").to_string()); +} + +#[test] +fn server_manager_get_uptime() { + let server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), false); + assert!(Regex::new(r"^\d+ Days, \d+ Hours, \d+ Minutes, \d+ Seconds$").unwrap().is_match(&server_manager.get_uptime())); +} + +#[test] +fn job_request_empty_job_name() { + let job_request = JobRequest::new("1", "", "/tmp/somewhere", vec![]); + let command_store = commands![::FACTOTUM.to_string() => "/tmp/fake_path".to_string()]; + let validation_error = JobRequest::validate(job_request.clone(), &command_store).unwrap_err(); + assert_eq!(validation_error, ValidationError::no_output("No valid value found: field 'jobName' cannot be empty".to_string())); +} + +#[test] +fn job_request_empty_factfile_path() { + let job_request = JobRequest::new("1", "dummy", "", vec![]); + let command_store = commands![::FACTOTUM.to_string() => "/tmp/fake_path".to_string()]; + let validation_error = JobRequest::validate(job_request.clone(), &command_store).unwrap_err(); + assert_eq!(validation_error, ValidationError::no_output("No valid value found: field 'factfilePath' cannot be empty".to_string())); +} + +#[test] +fn job_request_invalid_factfile_path() { + let job_request = JobRequest::new("1", "dummy", "/tmp/somewhere", vec![]); + let command_store = commands![::FACTOTUM.to_string() => "/tmp/fake_path".to_string()]; + let validation_error = JobRequest::validate(job_request.clone(), &command_store).unwrap_err(); + assert_eq!(validation_error, ValidationError::no_output("Value does not exist on host for 'factfilePath':'/tmp/somewhere'".to_string())); +} + +#[test] +fn job_request_can_append_job_args() { + let server_manager = ServerManager::new(Some("0.0.0.0".to_string()), 8080, "http://dummy.test/".to_string(), true); + let mut job_request = JobRequest::new("1", "dummy", "/tmp/somewhere", vec!["--first-arg".to_string()]); + JobRequest::append_job_args(&server_manager, &mut job_request); + assert_eq!(job_request.factfile_args, vec!["--first-arg", "--webhook", "http://dummy.test/", "--no-colour"]); +} + +#[test] +fn settings_request_is_valid() { + let settings_request = SettingsRequest::new(::SERVER_STATE_RUN); + let validated_settings_request = SettingsRequest::validate(settings_request.clone()).unwrap(); + assert_eq!(validated_settings_request, settings_request); +} + +#[test] +fn settings_request_is_invalid() { + let settings_request = SettingsRequest::new("NOT A SERVER STATE"); + let validation_error = SettingsRequest::validate(settings_request).err().unwrap(); + assert_eq!(validation_error.description(), "Invalid 'state', must be one of (run|drain)"); +} diff --git a/factotum-server/src/factotum_server/tests.rs b/factotum-server/src/factotum_server/tests.rs new file mode 100644 index 0000000..a1d1d0b --- /dev/null +++ b/factotum-server/src/factotum_server/tests.rs @@ -0,0 +1,140 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +use super::*; +use std::time::Duration; + +#[test] +fn worker_manager_spawn_and_exit() { + let (tx, rx) = mpsc::channel(); + let pool = ThreadPool::new(2); + let persistence = ConsulPersistence::new(None, None, None, None); + let command_store = commands!["dummy".to_string() => "/tmp/fake_command".to_string()]; + + let handle = spawn_worker_manager(tx.clone(), rx, VecDeque::new(), 2, pool.clone(), persistence, command_store); + tx.send(Dispatch::StopProcessing).unwrap(); + + let output = handle.join().unwrap(); + assert_eq!("EXITING WORKER MANAGER", output); +} + +#[test] +fn send_status_update_success() { + let (tx, rx) = mpsc::channel(); + let query = Query::new("status_query".to_string(), tx); + let pool = ThreadPool::new(2); + let job_request = JobRequest::new("1", "dummy", "/tmp/somewhere", vec![]); + let mut requests_queue = VecDeque::new(); + requests_queue.push_back(job_request); + + send_status_update(query, &mut requests_queue, 10, pool); + + let actual = rx.recv_timeout(Duration::from_millis(1000)).unwrap(); + let expected = DispatcherStatus { + workers: WorkerStatus { + total: 2, + idle: 2, + active: 0, + }, + jobs: JobStatus { + max_queue_size: 10, + in_queue: 1, + fail_count: 0, + success_count: 0 + } + }; + assert_eq!(expected, actual); +} + +#[test] +fn is_queue_full_true() { + let (tx, rx) = mpsc::channel(); + let query = Query::new("queue_query".to_string(), tx); + let job_request = JobRequest::new("1", "dummy", "/tmp/somewhere", vec![]); + let mut requests_queue = VecDeque::new(); + requests_queue.push_back(job_request.clone()); + requests_queue.push_back(job_request.clone()); + + is_queue_full(query, &mut requests_queue, 2); + + let result = rx.recv_timeout(Duration::from_millis(1000)).unwrap(); + assert_eq!(true, result); +} + +#[test] +fn is_queue_full_false() { + let (tx, rx) = mpsc::channel(); + let query = Query::new("queue_query".to_string(), tx); + let mut requests_queue = VecDeque::new(); + + is_queue_full(query, &mut requests_queue, 2); + + let result = rx.recv_timeout(Duration::from_millis(1000)).unwrap(); + assert_eq!(false, result); +} + +#[test] +fn new_job_request_success() { + let (tx, rx) = mpsc::channel(); + let pool = ThreadPool::new(2); + let persistence = ConsulPersistence::new(None, None, None, None); + let job_request = JobRequest::new("1", "dummy", "/tmp/somewhere", vec![]); + let mut requests_queue = VecDeque::new(); + + new_job_request(tx.clone(), &mut requests_queue, pool.clone(), persistence, job_request.clone()); + + let output = rx.recv_timeout(Duration::from_millis(1000)).unwrap(); + assert_eq!(Dispatch::ProcessRequest, output); + assert!(requests_queue.contains(&job_request)); +} + +#[test] +fn process_job_request_failure() { + let (tx, rx) = mpsc::channel(); + let pool = ThreadPool::new(2); + let persistence = ConsulPersistence::new(None, None, None, None); + let command_store = commands!["dummy".to_string() => "/tmp/fake_command".to_string()]; + let job_request = JobRequest::new("1", "dummy", "/tmp/somewhere", vec![]); + let mut requests_queue = VecDeque::new(); + requests_queue.push_back(job_request.clone()); + + process_job_request(tx.clone(), &mut requests_queue, pool.clone(), persistence, command_store); + + let output = rx.recv_timeout(Duration::from_millis(1000)).unwrap(); + assert_eq!(Dispatch::RequestFailure(job_request), output); +} + +#[test] +fn complete_job_request_success() { + let (tx, rx) = mpsc::channel(); + let persistence = ConsulPersistence::new(None, None, None, None); + let job_request = JobRequest::new("1", "dummy", "/tmp/somewhere", vec![]); + + complete_job_request(tx, persistence, job_request); + + let output = rx.recv_timeout(Duration::from_millis(1000)).unwrap(); + assert_eq!(Dispatch::ProcessRequest, output); +} + +#[test] +fn failed_job_request_success() { + let (tx, rx) = mpsc::channel(); + let persistence = ConsulPersistence::new(None, None, None, None); + let job_request = JobRequest::new("1", "dummy", "/tmp/somewhere", vec![]); + + failed_job_request(tx, persistence, job_request); + + let output = rx.recv_timeout(Duration::from_millis(1000)).unwrap(); + assert_eq!(Dispatch::ProcessRequest, output); +} diff --git a/factotum-server/src/main.rs b/factotum-server/src/main.rs new file mode 100644 index 0000000..6e9046b --- /dev/null +++ b/factotum-server/src/main.rs @@ -0,0 +1,194 @@ +// Copyright (c) 2016-2017 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, and +// you may not use this file except in compliance with the Apache License +// Version 2.0. You may obtain a copy of the Apache License Version 2.0 at +// http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License Version 2.0 is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the Apache License Version 2.0 for the specific language +// governing permissions and limitations there under. +// + +#[macro_use] +extern crate log; +extern crate log4rs; +extern crate docopt; +extern crate getopts; +extern crate chrono; +#[macro_use] +extern crate lazy_static; +extern crate regex; +extern crate url; +extern crate crypto; +extern crate threadpool; +extern crate iron; +#[macro_use(router)] +extern crate router; +extern crate bodyparser; +extern crate persistent; +extern crate logger; +extern crate rustc_serialize; +extern crate serde; +#[macro_use] +extern crate serde_derive; +#[macro_use] +extern crate serde_json; +extern crate consul; +extern crate base64; + +use docopt::Docopt; +use log::LogLevelFilter; +use regex::Regex; + +mod factotum_server; + +const VERSION: &'static str = env!("CARGO_PKG_VERSION"); + +const FACTOTUM: &'static str = "factotum"; + +const IP_DEFAULT: &'static str = "0.0.0.0"; +const PORT_DEFAULT: u32 = 3000; +const MAX_JOBS_DEFAULT: usize = 1000; +const MAX_WORKERS_DEFAULT: usize = 20; + +const CONSUL_NAME_DEFAULT: &'static str = FACTOTUM; +const CONSUL_IP_DEFAULT: &'static str = "127.0.0.1"; +const CONSUL_PORT_DEFAULT: u32 = 8500; +const CONSUL_NAMESPACE_DEFAULT: &'static str = "com.snowplowanalytics/factotum"; + +const SERVER_STATE_RUN: &'static str = "run"; +const SERVER_STATE_DRAIN: &'static str = "drain"; + +const VALID_IP_REGEX: &'static str = r"\b(?:(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\b"; + +const USAGE: &'static str = + " +Factotum Server. + +Usage: + factotum_server --factotum-bin= [--ip=
] [--port=] [--max-jobs=] [--max-workers=] [--webhook=] [--no-colour] [--consul-name=] [--consul-ip=
] [--consul-port=] [--consul-namespace=] [--log-level=] + factotum_server (-h | --help) + factotum_server (-v | --version) + +Options: + -h --help Show this screen. + -v --version Display the version of Factotum Server and exit. + --ip=
Specify binding IP address. + --port= Specify port number. + --log-level= Specify logging level. + --max-jobs= Max size of job requests queue. + --max-workers= Max number of workers. + --factotum-bin= Path to Factotum binary file. + --webhook= Factotum arg to post updates on job execution to the specified URL. + --no-colour Factotum arg to turn off ANSI terminal colours/formatting in output. + --consul-name= Specify node name of Consul server agent. + --consul-ip=
Specify IP address for Consul server agent. + --consul-port= Specify port number for Consul server agent. + --consul-namespace= Specify namespace of job references stored in Consul persistence. + +"; + +#[derive(Debug, RustcDecodable)] +pub struct Args { + flag_version: bool, + flag_ip: Option, + flag_port: u32, + flag_log_level: Option, + flag_max_jobs: usize, + flag_max_workers: usize, + flag_factotum_bin: String, + flag_webhook: String, + flag_no_colour: bool, + flag_consul_name: Option, + flag_consul_ip: Option, + flag_consul_port: Option, + flag_consul_namespace: Option, +} + +fn main() { + let args: Args = Docopt::new(USAGE) + .and_then(|d| d.decode()) + .unwrap_or_else(|e| e.exit()); + + if args.flag_version { + println!("Factotum Server version [{}]", VERSION); + } else { + check_factotum_bin_arg(&args.flag_factotum_bin); + check_ip_arg(&args.flag_ip); + check_ip_arg(&args.flag_consul_ip); + check_and_init_logger(&args.flag_log_level); + factotum_server::start(args); + } +} + +// --- Helpers --- + +fn check_factotum_bin_arg(factotum_bin: &str) { + if !std::path::Path::new(factotum_bin).exists() { + println!("Invalid path for Factotum binary at: '{}'", factotum_bin); + std::process::exit(1) + } +} + +fn check_ip_arg(wrapped_ip: &Option) { + if let Some(ip) = wrapped_ip.as_ref() { + if !is_a_valid_ip(&ip) { + println!("Invalid IP address: [{}] - Regex mismatch", ip); + std::process::exit(1) + } + } +} + +fn check_and_init_logger(level_input: &Option) { + let log_level = get_log_level(level_input); + let log_config = get_log_config(log_level).unwrap(); + log4rs::init_config(log_config).unwrap(); +} + +fn is_a_valid_ip(text: &str) -> bool { + lazy_static! { + static ref RE: Regex = Regex::new(::VALID_IP_REGEX).unwrap(); + } + RE.is_match(text) +} + +fn get_log_level(level_input: &Option) -> LogLevelFilter { + let log_level = match level_input.as_ref() { + Some(input) => input, + None => return LogLevelFilter::Warn, + }; + match log_level.to_lowercase().as_ref() { + "off" => LogLevelFilter::Off, + "error" => LogLevelFilter::Error, + "warn" => LogLevelFilter::Warn, + "info" => LogLevelFilter::Info, + "debug" => LogLevelFilter::Debug, + "trace" => LogLevelFilter::Trace, + _ => { + println!("Unknown log level: '{}'", log_level); + println!("Please select a valid log level."); + std::process::exit(1) + }, + } +} + +fn get_log_config(log_level: LogLevelFilter) -> Result { + use log4rs::append::console::ConsoleAppender; + use log4rs::encode::pattern::PatternEncoder; + use log4rs::config::{Appender, Config, Root}; + + let stdout = ConsoleAppender::builder() + .encoder(Box::new(PatternEncoder::new("{d} {l:>5} - {m}{n}"))) + .build(); + + let root = Root::builder() + .appender("stdout") + .build(log_level); + + Config::builder() + .appender(Appender::builder().build("stdout", Box::new(stdout))) + .build(root) +} diff --git a/src/factotum/webhook/jobupdate/mod.rs b/src/factotum/webhook/jobupdate/mod.rs index 94da371..036cae3 100644 --- a/src/factotum/webhook/jobupdate/mod.rs +++ b/src/factotum/webhook/jobupdate/mod.rs @@ -20,7 +20,7 @@ static JOB_UPDATE_SCHEMA_NAME: &'static str = "iglu:com.snowplowanalytics.\ static TASK_UPDATE_SCHEMA_NAME: &'static str = "iglu:com.snowplowanalytics.\ factotum/task_update/jsonschema/1-0-0"; -const MAX_STDOUT_STDERR_SIZE: usize = 10_000; // 10kb +const MAX_STDOUT_STDERR_SIZE: usize = 4_000; // 4kb use factotum::executor::{ExecutionState, ExecutionUpdate, TaskSnapshot, Transition as ExecutorTransition}; diff --git a/src/factotum/webhook/jobupdate/tests.rs b/src/factotum/webhook/jobupdate/tests.rs index 24cf6cb..fbf0025 100644 --- a/src/factotum/webhook/jobupdate/tests.rs +++ b/src/factotum/webhook/jobupdate/tests.rs @@ -315,7 +315,7 @@ fn big_task_stdout_trimmed() { let now = UTC::now(); - let max_len = 10000; + let max_len = 4000; example_tasks[0].state = State::Success; example_tasks[0].run_started = Some(now.clone()); @@ -373,7 +373,7 @@ fn big_task_stderr_trimmed() { let now = UTC::now(); - let max_len = 10000; + let max_len = 4000; example_tasks[0].state = State::Success; example_tasks[0].run_started = Some(now.clone()); diff --git a/vagrant/up.playbooks b/vagrant/up.playbooks index a1e446b..d700dfe 100644 --- a/vagrant/up.playbooks +++ b/vagrant/up.playbooks @@ -1 +1,2 @@ oss-playbooks/rust.yml +oss-playbooks/consul-0.7.5.yml