Skip to content

Commit

Permalink
cleanup: remove sample token, default conn info struct
Browse files Browse the repository at this point in the history
Signed-off-by: Gina Peers <gpeers@chef.io>
  • Loading branch information
Gina Peers committed Apr 11, 2019
1 parent 149220d commit 5e810e3
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 81 deletions.
150 changes: 72 additions & 78 deletions components/sup/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,34 @@
mod types;

use self::types::{EventMessage,
EventMetadata,
HealthCheckEvent,
ServiceStartedEvent,
ServiceStoppedEvent};
use self::types::{
EventMessage, EventMetadata, HealthCheckEvent, ServiceStartedEvent, ServiceStoppedEvent,
};
use crate::AutomateAuthToken;
use crate::{error::Result,
manager::{service::{HealthCheck,
Service},
sys::Sys}};
use crate::{
error::Result,
manager::{
service::{HealthCheck, Service},
sys::Sys,
},
};
use clap::ArgMatches;
use futures::{sync::{mpsc as futures_mpsc,
mpsc::UnboundedSender},
Future,
Stream};
use futures::{
sync::{mpsc as futures_mpsc, mpsc::UnboundedSender},
Future, Stream,
};
use habitat_common::types::EventStreamMetadata;
use nitox::{commands::ConnectCommand,
streaming::{client::NatsStreamingClient,
error::NatsStreamingError},
NatsClient,
NatsClientOptions};
use nitox::{
commands::ConnectCommand, streaming::client::NatsStreamingClient, NatsClient, NatsClientOptions,
};
use state::Container;
use std::{net::SocketAddr,
sync::{mpsc as std_mpsc,
Once},
thread,
time::Duration};
use tokio::{executor,
runtime::current_thread::Runtime as ThreadRuntime};
use std::{
net::SocketAddr,
sync::{mpsc as std_mpsc, Once},
thread,
time::Duration,
};
use tokio::{executor, runtime::current_thread::Runtime as ThreadRuntime};

static INIT: Once = Once::new();
lazy_static! {
Expand Down Expand Up @@ -87,19 +86,23 @@ pub fn init_stream(conn_info: EventConnectionInfo, event_core: EventCore) {
pub struct EventStreamConfig {
environment: String,
application: String,
meta: EventStreamMetadata,
meta: EventStreamMetadata,
}

impl EventStreamConfig {
/// Create an instance from Clap arguments.
pub fn from_matches(m: &ArgMatches) -> Result<EventStreamConfig> {
Ok(EventStreamConfig { environment: m.value_of("EVENT_STREAM_ENVIRONMENT")
.map(str::to_string)
.expect("Required option for EventStream feature"),
application: m.value_of("EVENT_STREAM_APPLICATION")
.map(str::to_string)
.expect("Required option for EventStream feature"),
meta: EventStreamMetadata::from_matches(m)?, })
Ok(EventStreamConfig {
environment: m
.value_of("EVENT_STREAM_ENVIRONMENT")
.map(str::to_string)
.expect("Required option for EventStream feature"),
application: m
.value_of("EVENT_STREAM_APPLICATION")
.map(str::to_string)
.expect("Required option for EventStream feature"),
meta: EventStreamMetadata::from_matches(m)?,
})
}
}

Expand Down Expand Up @@ -133,46 +136,55 @@ pub struct EventCore {
// map directly... hrmm
application: String,
environment: String,
meta: EventStreamMetadata,
meta: EventStreamMetadata,
}

impl EventCore {
pub fn new(config: EventStreamConfig, sys: &Sys) -> Self {
EventCore { supervisor_id: sys.member_id.clone(),
ip_address: sys.gossip_listen(),
environment: config.environment,
application: config.application,
meta: config.meta, }
EventCore {
supervisor_id: sys.member_id.clone(),
ip_address: sys.gossip_listen(),
environment: config.environment,
application: config.application,
meta: config.meta,
}
}
}

/// Send an event for the start of a Service.
pub fn service_started(service: &Service) {
if stream_initialized() {
publish(ServiceStartedEvent { service_metadata: Some(service.to_service_metadata()),
event_metadata: None, });
publish(ServiceStartedEvent {
service_metadata: Some(service.to_service_metadata()),
event_metadata: None,
});
}
}

/// Send an event for the stop of a Service.
pub fn service_stopped(service: &Service) {
if stream_initialized() {
publish(ServiceStoppedEvent { service_metadata: Some(service.to_service_metadata()),
event_metadata: None, });
publish(ServiceStoppedEvent {
service_metadata: Some(service.to_service_metadata()),
event_metadata: None,
});
}
}

pub fn health_check(service: &Service,
check_result: HealthCheck,
duration: Duration,
has_hook: bool) {
pub fn health_check(
service: &Service,
check_result: HealthCheck,
duration: Duration,
has_hook: bool,
) {
if stream_initialized() {
publish(HealthCheckEvent { service_metadata: Some(service.to_service_metadata()),
event_metadata: None,
result: Into::<types::HealthCheck>::into(check_result)
as i32,
duration: Some(duration.into()),
has_hook });
publish(HealthCheckEvent {
service_metadata: Some(service.to_service_metadata()),
event_metadata: None,
result: Into::<types::HealthCheck>::into(check_result) as i32,
duration: Some(duration.into()),
has_hook,
});
}
}

Expand All @@ -181,7 +193,9 @@ pub fn health_check(service: &Service,
/// Internal helper function to know whether or not to go to the trouble of
/// creating event structures. If the event stream hasn't been
/// initialized, then we shouldn't need to do anything.
fn stream_initialized() -> bool { EVENT_STREAM.try_get::<EventStream>().is_some() }
fn stream_initialized() -> bool {
EVENT_STREAM.try_get::<EventStream>().is_some()
}

/// Publish an event. This is the main interface that client code will
/// use.
Expand All @@ -201,9 +215,10 @@ fn publish(mut event: impl EventMessage) {
// one.
//
// The ugliness is at least contained, though.
event.event_metadata(EventMetadata { timestamp:
Some(std::time::SystemTime::now().into()),
..EVENT_CORE.get::<EventCore>().to_event_metadata() });
event.event_metadata(EventMetadata {
timestamp: Some(std::time::SystemTime::now().into()),
..EVENT_CORE.get::<EventCore>().to_event_metadata()
});

e.send(event.to_bytes());
}
Expand All @@ -228,24 +243,6 @@ impl EventStream {
/// All messages are published under this subject.
const HABITAT_SUBJECT: &str = "habitat";

/// Defines default connection information for a NATS Streaming server
/// running on localhost.
// TODO: As we become clear on the interaction between Habitat and A2,
// this implementation *may* disappear. It's useful for testing and
// prototyping, though.
impl Default for EventConnectionInfo {
fn default() -> Self {
EventConnectionInfo {
name: String::from("habitat"),
verbose: true,
cluster_uri: String::from("127.0.0.1:4223"),
cluster_id: String::from("test-cluster"),
// DON'T LEAVE THIS ADMIN TOKEN IN HERE!
auth_token: AutomateAuthToken("D6fHxsfc_FlGG4coaZXdNv-vSUM=".to_string()),
}
}
}

fn init_nats_stream(conn_info: EventConnectionInfo) -> Result<EventStream> {
// TODO (CM): Investigate back-pressure scenarios
let (event_tx, event_rx) = futures_mpsc::unbounded();
Expand All @@ -267,8 +264,6 @@ fn init_nats_stream(conn_info: EventConnectionInfo) -> Result<EventStream> {
} = conn_info;

let cc = ConnectCommand::builder()
// .user(Some("nats".to_string()))
// .pass(Some("S3Cr3TP@5w0rD".to_string()))
.name(Some(name))
.verbose(verbose)
.auth_token(Some(auth_token.as_str().to_string()))
Expand All @@ -284,7 +279,6 @@ fn init_nats_stream(conn_info: EventConnectionInfo) -> Result<EventStream> {
let publisher = NatsClient::from_options(opts)
.map_err(|e| {
error!("Error creating Nats Client from options: {}", e);
//Into::<NatsStreamingError>::into(e)
e.into()
})
.and_then(|client| {
Expand Down
8 changes: 5 additions & 3 deletions components/sup/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,10 @@ impl Manager {
let ec = EventCore::new(es_config, &sys);
// unwrap won't fail here; if there were an issue, from_env()
// would have already propagated an error up the stack.
let c = AutomateAuthToken::from_env().unwrap();
event::init_stream(Self::init_conn_info(&c), ec);
event::init_stream(
Self::init_conn_info(AutomateAuthToken::from_env().unwrap()),
ec,
);
}

Ok(Manager {
Expand Down Expand Up @@ -517,7 +519,7 @@ impl Manager {
/// Initialize struct containing the connection information required to
/// connect to the Automate messaging server: subject, verbosity, messaging
/// cluster id and uri, and authentication token.
fn init_conn_info(msg_auth_token: &AutomateAuthToken) -> event::EventConnectionInfo {
fn init_conn_info(msg_auth_token: AutomateAuthToken) -> event::EventConnectionInfo {
// Messaging connection information is hard-coded for now,
// with the exception of the Authomate authentication token.
// TODO: Determine what the actual connection parameters
Expand Down

0 comments on commit 5e810e3

Please sign in to comment.