Skip to content

Commit

Permalink
Merge pull request #6344 from habitat-sh/cm/nats-protobuf
Browse files Browse the repository at this point in the history
Initial protobufs for events
  • Loading branch information
christophermaier authored May 17, 2019
2 parents 27b1c56 + 74d7189 commit b9652b9
Show file tree
Hide file tree
Showing 24 changed files with 1,632 additions and 361 deletions.
27 changes: 26 additions & 1 deletion .expeditor/verify.pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ steps:
- label: "[unit] :linux: butterfly ignored"
command:
# put this back once the deadlock detection comes back
#- ./test/run_cargo_test.sh --nightly --features "deadlock_detection" --test-options "--test-threads=1 --ignored" butterfly
#- ./test/run_cargo_test.sh butterfly --nightly --features "deadlock_detection" --test-options "--test-threads=1 --ignored"
- ./test/run_cargo_test.sh butterfly --test-options "--test-threads=1 --ignored"
agents:
queue: 'default-privileged'
Expand Down Expand Up @@ -307,6 +307,20 @@ steps:
timeout_in_minutes: 10
soft_fail: true

- label: "[unit] :linux: sup nitox_stream"
command:
- ./test/run_cargo_test.sh sup --features "ignore_integration_tests nitox_stream"
agents:
queue: 'default-privileged'
plugins:
docker#v3.0.1:
always-pull: true
user: "buildkite-agent"
group: "buildkite-agent"
image: "chefes/buildkite"
timeout_in_minutes: 10
soft_fail: true

- label: "[unit] :linux: sup-client"
command:
- ./test/run_cargo_test.sh sup-client
Expand Down Expand Up @@ -480,6 +494,17 @@ steps:
automatic:
limit: 1

- label: "[unit] :windows: sup nitox_stream"
command:
# This test has test (not code) concurrency issues and will fail if we don't limit it
- ./test/run_cargo_test.ps1 sup -Features "nitox_stream" -TestOptions "--test-threads=1"
agents:
queue: 'default-windows-privileged'
timeout_in_minutes: 40
retry:
automatic:
limit: 1

- label: "[unit] :windows: sup-client"
command:
- ./test/run_cargo_test.ps1 sup-client
Expand Down
123 changes: 118 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions components/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ pub enum Error {
GossipFileRelativePath(String),
HabitatCore(hcore::Error),
InstallHookFailed(PackageIdent),
InvalidEventStreamToken(String),
InvalidInstallHookMode(String),
/// Occurs when making lower level IO calls.
IO(io::Error),
/// Errors when joining paths :)
JoinPathsError(env::JoinPathsError),
MissingCLIInputError(String),
NetParseError(net::AddrParseError),
OfflineArtifactNotFound(PackageIdent),
OfflineOriginKeyNotFound(String),
Expand Down Expand Up @@ -98,9 +100,15 @@ impl fmt::Display for Error {
s)
}
Error::HabitatCore(ref e) => format!("{}", e),
Error::MissingCLIInputError(ref arg) => {
format!("Missing required CLI argument!: {}", arg)
}
Error::InstallHookFailed(ref ident) => {
format!("Install hook exited unsuccessfully: {}", ident)
}
Error::InvalidEventStreamToken(ref s) => {
format!("Invalid event stream token provided: '{}'", s)
}
Error::InvalidInstallHookMode(ref e) => {
format!("Invalid InstallHookMode conversion from {}", e)
}
Expand Down Expand Up @@ -164,9 +172,11 @@ impl error::Error for Error {
}
Error::HabitatCore(ref err) => err.description(),
Error::InstallHookFailed(_) => "Install hook exited unsuccessfully",
Error::InvalidEventStreamToken(_) => "Invalid event stream token provided",
Error::InvalidInstallHookMode(_) => "Invalid InstallHookMode",
Error::IO(ref err) => err.description(),
Error::JoinPathsError(ref err) => err.description(),
Error::MissingCLIInputError(_) => "Missing required CLI argument!",
Error::NetParseError(_) => "Can't parse IP:port",
Error::OfflineArtifactNotFound(_) => "Cached artifact not found in offline mode",
Error::OfflineOriginKeyNotFound(_) => "Cached origin key not found in offline mode",
Expand Down
131 changes: 130 additions & 1 deletion components/common/src/types/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
mod listen_ctl_addr;

pub use self::listen_ctl_addr::ListenCtlAddr;
use crate::error::Error;
use clap::ArgMatches;
use std::{collections::HashMap,
fmt,
result,
str::FromStr};

/// Bundles up information about the user and group that a supervised
/// service should be run as. If the Supervisor itself is running with
Expand All @@ -22,3 +27,127 @@ pub struct UserInfo {
/// Linux preferred
pub gid: Option<u32>,
}

/// Captures arbitrary key-value pair metadata to attach to all events
/// generated by the Supervisor.
#[derive(Clone, Debug, Default)]
pub struct EventStreamMetadata(HashMap<String, String>);

impl Into<HashMap<String, String>> for EventStreamMetadata {
fn into(self) -> HashMap<String, String> { self.0 }
}

impl EventStreamMetadata {
/// The name of the Clap argument we'll use for arguments of this type.
pub const ARG_NAME: &'static str = "EVENT_STREAM_METADATA";

/// Ensure that user input from Clap can be converted into a
/// key-value pair we can consume.
///
/// Note: this validates each value given by the user, not all the
/// values given at once.
#[allow(clippy::needless_pass_by_value)] // Signature required by CLAP
pub fn validate(value: String) -> result::Result<(), String> {
Self::split_raw(&value).map(|_| ())
}

/// Utility function to create a key-value pair tuple from a
/// user-provided value in Clap.
fn split_raw(raw: &str) -> result::Result<(String, String), String> {
match raw.split('=').collect::<Vec<_>>().as_slice() {
[key, value] if !key.is_empty() && !value.is_empty() => {
Ok((key.to_string(), value.to_string()))
}
_ => {
Err(format!("Invalid key-value pair given (must be \
'='-delimited pair of non-empty strings): {}",
raw))
}
}
}

/// Same as `split_raw`, but for running on already-validated
/// input (thus, this function cannot fail).
fn split_validated(validated_input: &str) -> (String, String) {
Self::split_raw(validated_input).expect("EVENT_STREAM_METADATA should be validated at \
this point")
}
}

impl<'a> From<&'a ArgMatches<'a>> for EventStreamMetadata {
/// Create an instance of `EventStreamMetadata` from validated
/// user input.
fn from(m: &ArgMatches) -> Self {
let raw_meta = m.values_of(Self::ARG_NAME).unwrap_or_default();
Self(raw_meta.map(Self::split_validated).collect())
}
}

/// This represents an environment variable that holds an authentication token which enables
/// integration with Automate. Supervisors use this token to connect to the messaging server
/// on the Automate side in order to send data about the services they're running via event
/// messages. If the environment variable is present, its value is the auth token. If it's not
/// present and the feature flag for the Event Stream is enabled, initialization of the Event
/// Stream will fail.
#[derive(Clone, Debug)]
pub struct AutomateAuthToken(String);

impl AutomateAuthToken {
/// The name of the Clap argument we'll use for arguments of this type.
pub const ARG_NAME: &'static str = "EVENT_STREAM_TOKEN";
// Ideally, we'd like to take advantage of
// `habitat_core::env::Config` trait, but that currently requires
// a `Default` implementation, and there isn't really a legitimate
// default value right now.
pub const ENVVAR: &'static str = "HAB_AUTOMATE_AUTH_TOKEN";
}

impl AutomateAuthToken {
/// Ensure that user input from Clap can be converted an instance
/// of a token.
#[allow(clippy::needless_pass_by_value)] // Signature required by CLAP
pub fn validate(value: String) -> result::Result<(), String> {
value.parse::<Self>().map(|_| ()).map_err(|e| e.to_string())
}
}

impl<'a> From<&'a ArgMatches<'a>> for AutomateAuthToken {
/// Create an instance of `AutomateAuthToken` from validated
/// user input.
fn from(m: &ArgMatches) -> Self {
m.value_of(Self::ARG_NAME)
.expect("HAB_AUTOMATE_AUTH_TOKEN should be set")
.parse()
.expect("HAB_AUTOMATE_AUTH_TOKEN should be validated at this point")
}
}

impl FromStr for AutomateAuthToken {
type Err = Error;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if s.is_empty() {
Err(Error::InvalidEventStreamToken(s.to_string()))
} else {
Ok(AutomateAuthToken(s.to_string()))
}
}
}

impl fmt::Display for AutomateAuthToken {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{}", self.0) }
}

#[cfg(test)]
mod test {
use super::*;

mod auth_token {
use super::*;

#[test]
fn cannot_parse_from_empty_string() { assert!("".parse::<AutomateAuthToken>().is_err()) }

}

}
1 change: 1 addition & 0 deletions components/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ cc = "*"
[dependencies]
base64 = "*"
dirs = "*"
dns-lookup = "*"
errno = "*"
hex = "*"
lazy_static = "*"
Expand Down
58 changes: 58 additions & 0 deletions components/core/src/os/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,61 @@ mod imp;
mod imp;

pub use self::imp::*;

use std::io;

/// Returns the fqdn from the provided hostname.
pub fn lookup_fqdn(hostname: &str) -> io::Result<String> {
#[cfg(not(windows))]
let flags = libc::AI_CANONNAME;

#[cfg(windows)]
let flags = winapi::shared::ws2def::AI_CANONNAME;

let hints = dns_lookup::AddrInfoHints { flags,
..dns_lookup::AddrInfoHints::default() };

// If 'hints.flags' includes the AI_CANONNAME flag, then the ai_canonname
// field of the first of the addrinfo structures in the returned list is set
// to point to the official name of the host.
if let Some(first_result) = dns_lookup::getaddrinfo(Some(hostname), None, Some(hints))?.next() {
match first_result {
Ok(f) => Ok(f.canonname.expect("Some(canonname) if requested")),
Err(e) => {
debug!("lookup_fqdn() was unable to lookup the machine fqdn. {:?}",
e);
Ok(hostname.to_string())
}
}
} else {
Ok(hostname.to_string())
}
}

/// Returns the fully qualified domain name of the running machine.
pub fn fqdn() -> Option<String> {
let result = dns_lookup::get_hostname().and_then(|hostname| lookup_fqdn(&hostname));
if let Err(ref e) = result {
debug!("fqdn() was unable to lookup the machine fqdn. {:?}", e);
}
result.ok()
}

#[cfg(not(windows))]
#[test]
fn test_fqdn_lookup() {
let fqdn = lookup_fqdn("localhost");
assert!(fqdn.is_ok());
assert_eq!(fqdn.unwrap(),
String::from("localhost"),
"the fqdn of localhost should be localhost");
}

#[cfg(not(windows))]
#[test]
fn test_fqdn_lookup_err() {
let fqdn = lookup_fqdn("");
assert!(fqdn.is_err(), "Should be an Err()");
assert_eq!(format!("{}", fqdn.unwrap_err()),
"failed to lookup address information: Name or service not known");
}
1 change: 0 additions & 1 deletion components/hab/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ lazy_static = "*"
libc = "*"
log = "*"
pbr = "*"
protobuf = "1.5.1"
retry = "*"
serde = "*"
serde_derive = "*"
Expand Down
Loading

0 comments on commit b9652b9

Please sign in to comment.