diff --git a/Cargo.toml b/Cargo.toml index e521dbc..1f546ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,5 @@ ron = "0.8.1" serde = { version = "1.0.204", features = ["derive"] } spidev = "0.6.0" syslog = "^7.0" +tokio = { version = "1.39.2", features = ["full"] } +rumqttc = "0.24.0" diff --git a/conf.ron b/conf.ron index 2b0c390..47af5a3 100644 --- a/conf.ron +++ b/conf.ron @@ -4,7 +4,8 @@ Config( port: "1234", login: "admin", password: "verysecurepassword", - topic: "lora/sensor" + topic: "lora/sensor", + enabled: true, ), lora_config: LoRaConfig( mode: RX, diff --git a/src/config.rs b/src/config.rs index f419580..1dfdd50 100644 --- a/src/config.rs +++ b/src/config.rs @@ -27,6 +27,7 @@ pub struct MQTTConfig { pub login: String, pub password: String, pub topic: String, + pub enabled: bool, } #[derive(Debug, Deserialize, Serialize, Clone)] diff --git a/src/lora.rs b/src/lora.rs index c528152..6ff9d10 100644 --- a/src/lora.rs +++ b/src/lora.rs @@ -1,13 +1,13 @@ use core::time; -use std::thread::sleep; use crate::config::RadioConfig; use crate::defines::*; +use crate::mqtt::BlockingQueue; use crate::packet::{Data, DataType, Packet, BME280}; use crate::version_tag::{print_rusty_beagle, print_version_tag}; use crate::{GPIOPin, GPIOPinNumber, LoRaConfig, Mode}; -use anyhow::{anyhow, Context, Result}; -use gpiod::{Chip, Edge, EdgeDetect, Input, Lines, Options, Output}; +use anyhow::{anyhow, Context, Error, Result}; +use gpiod::{Chip, Lines, Options, Output, Input, Edge, EdgeDetect}; use log::{error, info}; use spidev::{SpiModeFlags, Spidev, SpidevOptions, SpidevTransfer}; @@ -43,12 +43,12 @@ pub struct MockEvent { } impl LoRa { - pub fn sleep(ms: u64) { - sleep(time::Duration::from_millis(ms)); + pub async fn sleep(ms: u64) { + tokio::time::sleep(time::Duration::from_millis(ms)).await; } #[cfg(target_arch = "x86_64")] - pub fn from_config(_lora_config: &LoRaConfig) -> Result { + pub async fn from_config(_lora_config: &LoRaConfig) -> Result { let mock_registers = [1; 112]; let dio0_pin = MockGPIO {}; let mode = _lora_config.mode.clone(); @@ -61,29 +61,29 @@ impl LoRa { } #[cfg(target_arch = "x86_64")] - pub fn spi_read_register(&mut self, register: LoRaRegister, value: &mut u8) -> Result<()> { + pub async fn spi_read_register(&mut self, register: LoRaRegister, value: &mut u8) -> Result<()> { *value = self.mock_registers[register as usize]; Ok(()) } #[cfg(target_arch = "x86_64")] - pub fn spi_write_register(&mut self, register: LoRaRegister, value: u8) -> Result<()> { + pub async fn spi_write_register(&mut self, register: LoRaRegister, value: u8) -> Result<()> { self.mock_registers[register as usize] = value; Ok(()) } #[cfg(target_arch = "x86_64")] - pub fn reset(&mut self) -> Result<()> { + pub async fn reset(&mut self) -> Result<()> { self.mock_registers = [1; 112]; // wait for 10 ms before using the chip - Self::sleep(10); + Self::sleep(10).await; Ok(()) } #[cfg(target_arch = "x86_64")] - pub fn config_dio(&mut self) -> Result<()> { + pub async fn config_dio(&mut self) -> Result<()> { Ok(()) } @@ -209,47 +209,47 @@ impl LoRa { Ok(()) } - pub fn standby_mode(&mut self) -> Result<()> { + pub async fn standby_mode(&mut self) -> Result<()> { self.spi_write_register( LoRaRegister::OP_MODE, LoRaMode::LONG_RANGE as u8 | LoRaMode::STDBY as u8, ) .context("LoRa::standby_mode")?; - Self::sleep(10); + Self::sleep(10).await; Ok(()) } - pub fn sleep_mode(&mut self) -> Result<()> { + pub async fn sleep_mode(&mut self) -> Result<()> { self.spi_write_register( LoRaRegister::OP_MODE, LoRaMode::LONG_RANGE as u8 | LoRaMode::SLEEP as u8, ) .context("LoRa::sleep_mode")?; - Self::sleep(10); + Self::sleep(10).await; Ok(()) } - pub fn receive_mode(&mut self) -> Result<()> { + pub async fn receive_mode(&mut self) -> Result<()> { self.spi_write_register( LoRaRegister::OP_MODE, LoRaMode::LONG_RANGE as u8 | LoRaMode::RX_CONTINUOUS as u8, ) .context("LoRa::recieve_mode")?; - Self::sleep(10); + Self::sleep(10).await; Ok(()) } - pub fn transmit_mode(&mut self) -> Result<()> { + pub async fn transmit_mode(&mut self) -> Result<()> { self.spi_write_register( LoRaRegister::OP_MODE, LoRaMode::LONG_RANGE as u8 | LoRaMode::TX as u8, ) .context("LoRa::transmit_mode")?; - Self::sleep(10); + Self::sleep(10).await; Ok(()) } - pub fn set_tx_power(&mut self, level: u8) -> Result<()> { + pub async fn set_tx_power(&mut self, level: u8) -> Result<()> { let correct_level = match level { 0 | 1 => 2, 2..=17 => level, @@ -260,11 +260,11 @@ impl LoRa { PAConfiguration::PA_BOOST as u8 | correct_level, ) .context("LoRa::set_tx_power")?; - Self::sleep(10); + Self::sleep(10).await; Ok(()) } - pub fn set_frequency(&mut self, frequency: u64) -> Result<()> { + pub async fn set_frequency(&mut self, frequency: u64) -> Result<()> { let frf = (frequency << 19) / 32_000_000; self.spi_write_register(LoRaRegister::FRF_MSB, (frf >> 16) as u8) .context("LoRa::set_frequency ")?; @@ -272,12 +272,12 @@ impl LoRa { .context("LoRa::set_frequency ")?; self.spi_write_register(LoRaRegister::FRF_LSB, frf as u8) .context("LoRa::set_frequency ")?; - Self::sleep(10); + Self::sleep(10).await; Ok(()) } - pub fn set_bandwidth(&mut self, bandwidth: Bandwidth) -> Result<()> { + pub async fn set_bandwidth(&mut self, bandwidth: Bandwidth) -> Result<()> { let mut value = 0x00; let register = LoRaRegister::MODEM_CONFIG_1; self.spi_read_register(register, &mut value) @@ -286,12 +286,12 @@ impl LoRa { let mask = 0x0f; self.spi_write_register(register, (value & mask) | ((bandwidth as u8) << 4)) .context("LoRa::set_bandwidth")?; - Self::sleep(10); + Self::sleep(10).await; Ok(()) } - pub fn set_coding_rate(&mut self, coding_rate: CodingRate) -> Result<()> { + pub async fn set_coding_rate(&mut self, coding_rate: CodingRate) -> Result<()> { let mut value = 0x00; let register = LoRaRegister::MODEM_CONFIG_1; self.spi_read_register(register, &mut value) @@ -301,12 +301,12 @@ impl LoRa { let cr = coding_rate as u8 - 4; self.spi_write_register(register, (value & mask) | (cr << 1)) .context("LoRa::set_coding_rate")?; - Self::sleep(10); + Self::sleep(10).await; Ok(()) } - pub fn set_spreading_factor(&mut self, spreading_factor: SpreadingFactor) -> Result<()> { + pub async fn set_spreading_factor(&mut self, spreading_factor: SpreadingFactor) -> Result<()> { let mut value = 0x00; let register = LoRaRegister::MODEM_CONFIG_2; self.spi_read_register(register, &mut value) @@ -319,12 +319,12 @@ impl LoRa { (value & reg_mask) | (((spreading_factor as u8) << 4) & val_mask), ) .context("LoRa::set_spreading_factor")?; - Self::sleep(10); + Self::sleep(10).await; Ok(()) } - pub fn enable_crc(&mut self) -> Result<()> { + pub async fn enable_crc(&mut self) -> Result<()> { let mut value = 0x00; let crc_on = 0x04; let register = LoRaRegister::MODEM_CONFIG_2; @@ -333,12 +333,12 @@ impl LoRa { self.spi_write_register(register, value | crc_on) .context("LoRa::enable_crc")?; - Self::sleep(10); + Self::sleep(10).await; Ok(()) } - - pub fn get_bandwidth(&mut self) -> Result { + + pub async fn get_bandwidth(&mut self) -> Result { let mut value = 0x00; self.spi_read_register(LoRaRegister::MODEM_CONFIG_1, &mut value) .context("LoRa::get_bandwidth")?; @@ -346,7 +346,7 @@ impl LoRa { Ok((value & 0xf0) >> 4) } - pub fn get_coding_rate(&mut self) -> Result { + pub async fn get_coding_rate(&mut self) -> Result { let mut value = 0x00; self.spi_read_register(LoRaRegister::MODEM_CONFIG_1, &mut value) .context("LoRa::get_coding_rate")?; @@ -354,7 +354,7 @@ impl LoRa { Ok(((value & 0x0e) >> 1) + 4) } - pub fn get_spreading_factor(&mut self) -> Result { + pub async fn get_spreading_factor(&mut self) -> Result { let mut value = 0x00; self.spi_read_register(LoRaRegister::MODEM_CONFIG_1, &mut value) .context("LoRa::get_spreading_factor")?; @@ -362,17 +362,28 @@ impl LoRa { Ok((value >> 4) + 8) } - pub fn config_radio(&mut self, radio_config: RadioConfig) -> Result<()> { - self.set_frequency(radio_config.frequency) - .context("LoRa::config_radio")?; - self.set_bandwidth(radio_config.bandwidth) + pub async fn get_frequency(&mut self) -> Result<[u8; 3]> { + let mut values: [u8; 3] = [0, 0, 0]; + self.spi_read_register(LoRaRegister::FRF_MSB, &mut values[0]) + .context("LoRa::get_frequency")?; + self.spi_read_register(LoRaRegister::FRF_MID, &mut values[1]) + .context("LoRa::get_frequency")?; + self.spi_read_register(LoRaRegister::FRF_LSB, &mut values[2]) + .context("LoRa::get_frequency")?; + + Ok(values) + } + + pub async fn config_radio(&mut self, radio_config: RadioConfig) -> Result<()> { + self.set_frequency(433_000_000).await.context("LoRa::config_radio")?; + self.set_bandwidth(radio_config.bandwidth).await .context("LoRa::config_radio")?; - self.set_coding_rate(radio_config.coding_rate) + self.set_coding_rate(radio_config.coding_rate).await .context("LoRa::config_radio")?; - self.set_spreading_factor(radio_config.spreading_factor) + self.set_spreading_factor(radio_config.spreading_factor).await .context("LoRa::config_radio")?; - self.enable_crc().context("LoRa::config_radio")?; - self.set_tx_power(radio_config.tx_power) + self.enable_crc().await.context("LoRa::config_radio")?; + self.set_tx_power(radio_config.tx_power).await .context("LoRa::config_radio")?; Ok(()) @@ -390,10 +401,10 @@ impl LoRa { Ok(()) } - pub fn receive_packet(&mut self, crc_error: &mut bool) -> Result> { + pub async fn receive_packet(&mut self, crc_error: &mut bool) -> Result> { let mut return_length = 0; - self.receive_mode().context("LoRa::receive_packet")?; + self.receive_mode().await.context("LoRa::receive_packet")?; loop { let dio0_event = self.dio0_pin.read_event().context("LoRa::receive_packet")?; @@ -411,7 +422,7 @@ impl LoRa { } } - self.standby_mode().context("LoRa::receive_packet")?; + self.standby_mode().await.context("LoRa::receive_packet")?; self.spi_read_register(LoRaRegister::RX_NB_BYTES, &mut return_length) .context("LoRa::receive_packet")?; @@ -429,7 +440,7 @@ impl LoRa { Ok(buffer) } - pub fn send_packet(&mut self, buffer: Vec) -> Result<()> { + pub async fn send_packet(&mut self, buffer: Vec) -> Result<()> { let mut tx_address = 0x00; self.spi_read_register(LoRaRegister::FIFO_TX_BASE_ADDR, &mut tx_address) .context("LoRa::send_packet")?; @@ -440,7 +451,7 @@ impl LoRa { .context("LoRa::send_packet")?; self.write_fifo(buffer).context("LoRa::send_packet")?; - self.transmit_mode().context("LoRa::send_packet")?; + self.transmit_mode().await.context("LoRa::send_packet")?; loop { let dio0_event = self.dio0_pin.read_event().context("LoRa::send_packet")?; @@ -452,32 +463,32 @@ impl LoRa { } } - self.sleep_mode().context("LoRa::send_packet")?; + self.sleep_mode().await.context("LoRa::send_packet")?; Ok(()) } - pub fn start(&mut self, radio_config: RadioConfig) -> Result<()> { - self.reset().context("LoRa::start")?; - self.sleep_mode().context("LoRa::start")?; - self.config_radio(radio_config).context("LoRa::start")?; - self.config_dio().context("LoRa::start")?; + pub async fn start(&mut self, radio_config: RadioConfig, queue: Option>) -> Result { + self.reset().await.context("LoRa::start")?; + self.sleep_mode().await.context("LoRa::start")?; + self.config_radio(radio_config).await.context("LoRa::start")?; + self.config_dio().await.context("LoRa::start")?; self.spi_write_register(LoRaRegister::MODEM_CONFIG_3, 0x04u8) .context("LoRa::start")?; print_rusty_beagle(); print_version_tag(); println!("+-------------------------+"); - println!("| Bandwidth: {} |", self.get_bandwidth().context("LoRa::start")?); - println!("| Coding rate: {} |", self.get_coding_rate().context("LoRa::start")?); - println!("| Spreading factor: {:02} |", self.get_spreading_factor().context("LoRa::start")?); + println!("| Bandwidth: {} |", self.get_bandwidth().await.context("LoRa::start")?); + println!("| Coding rate: {} |", self.get_coding_rate().await.context("LoRa::start")?); + println!("| Spreading factor: {:02} |", self.get_spreading_factor().await.context("LoRa::start")?); println!("| Mode: {:?} |", self.mode); println!("+-------------------------+"); - for _ in 0..10000 { + loop { match self.mode { Mode::RX => { let mut crc_error = false; - let received_buffer = match self.receive_packet(&mut crc_error) { + let received_buffer = match self.receive_packet(&mut crc_error).await { Ok(s) => s, Err(e) => { eprintln!("{:?}", e); @@ -496,24 +507,26 @@ impl LoRa { println!(); } - let packet = match Packet::new(&received_buffer) { + match Packet::new(&received_buffer) { Ok(packet) => { println!("Received: {:#?}", packet); if !crc_error { info!("Received: {:?}", packet); + if let Some(queue) = &queue { + queue.put(packet).await; + }; } - packet - } + }, Err(e) => { println!("Bad package: {:?}", e); println!(); println!("Received: {:02X?}", received_buffer); - self.sleep_mode().context("LoRa::start")?; + self.sleep_mode().await.context("LoRa::start")?; continue; } }; - self.sleep_mode().context("LoRa::start")?; + self.sleep_mode().await.context("LoRa::start")?; } Mode::TX => { let mut lna = 0x00; @@ -522,7 +535,7 @@ impl LoRa { self.spi_write_register(LoRaRegister::LNA, lna | 0x03) .context("LoRa::start")?; - self.standby_mode().context("LoRa::start")?; + self.standby_mode().await.context("LoRa::start")?; let packet = Packet { version: 0x33, @@ -536,39 +549,35 @@ impl LoRa { pressure: 56, }), }; - self.send_packet(packet.to_bytes()?) - .context("LoRa::start")?; - self.sleep_mode()?; - Self::sleep(2000); + self.send_packet(packet.to_bytes()?).await.context("LoRa::start")?; + self.sleep_mode().await?; + Self::sleep(2000).await; } } } - - self.reset().context("LoRa::start")?; - Ok(()) } #[cfg(target_arch = "arm")] - pub fn reset(&mut self) -> Result<()> { + pub async fn reset(&mut self) -> Result<()> { // pull NRST pin low for 5 ms self.reset_pin .set_values(0x00_u8) .context("LoRa::LoRa reset: while setting reset_pin low")?; - Self::sleep(5); + Self::sleep(5).await; self.reset_pin .set_values(0x01_u8) .context("LoRa::LoRa reset: while setting reset_pin high")?; // wait 10 ms before using the chip - Self::sleep(10); + Self::sleep(10).await; Ok(()) } #[cfg(target_arch = "arm")] - pub fn config_dio(&mut self) -> Result<()> { + pub async fn config_dio(&mut self) -> Result<()> { let mut initial_value = 0x00; self.spi_read_register(LoRaRegister::DIO_MAPPING_1, &mut initial_value) .context("LoRa::config_dio")?; @@ -636,8 +645,8 @@ mod tests { .is_ok()); } - #[test] - fn standby_mode_correct() { + #[tokio::test] + async fn standby_mode_correct() { let config = handle_error!(Config::from_file("./conf.ron".to_string())); let mut lora = match LoRa::from_config(&config.lora_config) { Ok(lora) => lora, @@ -647,15 +656,15 @@ mod tests { } }; - handle_error!(lora.standby_mode()); + handle_error!(lora.standby_mode().await); let mut mode: u8 = 0x00; handle_error!(lora.spi_read_register(LoRaRegister::OP_MODE, &mut mode)); assert_eq!((LoRaMode::LONG_RANGE as u8 | LoRaMode::STDBY as u8), mode); } - #[test] - fn sleep_mode_correct() { + #[tokio::test] + async fn sleep_mode_correct() { let config = handle_error!(Config::from_file("./conf.ron".to_string())); let mut lora = match LoRa::from_config(&config.lora_config) { Ok(lora) => lora, @@ -665,15 +674,15 @@ mod tests { } }; - handle_error!(lora.sleep_mode()); + handle_error!(lora.sleep_mode().await); let mut mode: u8 = 0x00; handle_error!(lora.spi_read_register(LoRaRegister::OP_MODE, &mut mode)); assert_eq!((LoRaMode::LONG_RANGE as u8 | LoRaMode::SLEEP as u8), mode); } - #[test] - fn receive_mode_correct() { + #[tokio::test] + async fn receive_mode_correct() { let config = handle_error!(Config::from_file("./conf.ron".to_string())); let mut lora = match LoRa::from_config(&config.lora_config) { Ok(lora) => lora, @@ -683,7 +692,7 @@ mod tests { } }; - handle_error!(lora.receive_mode()); + handle_error!(lora.receive_mode().await); let mut mode: u8 = 0x00; handle_error!(lora.spi_read_register(LoRaRegister::OP_MODE, &mut mode)); @@ -693,8 +702,8 @@ mod tests { ); } - #[test] - fn transmit_mode_correct() { + #[tokio::test] + async fn transmit_mode_correct() { let config = handle_error!(Config::from_file("./conf.ron".to_string())); let mut lora = match LoRa::from_config(&config.lora_config) { Ok(lora) => lora, @@ -704,7 +713,7 @@ mod tests { } }; - handle_error!(lora.transmit_mode()); + handle_error!(lora.transmit_mode().await); let mut mode: u8 = 0x00; handle_error!(lora.spi_read_register(LoRaRegister::OP_MODE, &mut mode)); diff --git a/src/main.rs b/src/main.rs index f769b6e..2cf8c9e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod defines; mod logging; mod lora; mod packet; +mod mqtt; mod version_tag; extern crate log; @@ -16,11 +17,17 @@ pub use crate::logging::start_logger; use bme280::BME280Sensor; use log::{error, info}; use lora::LoRa; +use mqtt::BlockingQueue; +use mqtt::Mqtt; +use packet::DataType; +use packet::Packet; +use tokio::time::sleep; +use std::sync::Arc; use std::env; -use std::thread; use std::time::Duration; -macro_rules! handle_error { + +macro_rules! handle_error_exit { ($func:expr) => { match $func { Err(e) => { @@ -33,6 +40,20 @@ macro_rules! handle_error { }; } +macro_rules! handle_error_continue { + ($func:expr) => { + match $func { + Err(e) => { + eprintln!("{:?}", e); + error!("{:?}", e); + continue; + } + Ok(s) => s, + } + }; +} + + fn parse_args() -> String { let args: Vec = env::args().collect(); @@ -48,24 +69,26 @@ fn parse_args() -> String { } } -fn main() { +#[tokio::main] +async fn main() { start_logger(); let config_path = parse_args(); - let config = handle_error!(Config::from_file(config_path)); + let config = handle_error_exit!(Config::from_file(config_path)); let radio_config = config.lora_config.radio_config.clone(); + let mqtt_config = config.mqtt_config.clone(); let bme_config: BME280Config = config.bme_config.clone(); if bme_config.enabled { - thread::spawn(move || { + tokio::spawn(async move { let measurement_interval = bme_config.measurement_interval; - let mut bme280 = handle_error!(BME280Sensor::new(bme_config)); + let mut bme280 = handle_error_exit!(BME280Sensor::new(bme_config)); loop { if let Err(e) = bme280.print() { error!("Failed to print BME280 sensor measurements: {:?}", e); } - thread::sleep(Duration::from_secs(measurement_interval)); + sleep(Duration::from_secs(measurement_interval)).await; } }); } @@ -81,5 +104,36 @@ fn main() { std::process::exit(-1); } }; - handle_error!(lora.start(radio_config)); + + let lora_queue; + + if mqtt_config.enabled { + let queue = BlockingQueue::new(128); + lora_queue = Some(queue.clone()); + let mqtt_queue = queue.clone(); + + let mqtt = Arc::new(handle_error_exit!(Mqtt::new(mqtt_config.clone()).await)); + let mqtt_clone = Arc::clone(&mqtt); + tokio::spawn(async move { + let mqtt_config = mqtt_config; + loop { + let packet: Packet = mqtt_queue.take().await; + let msg = handle_error_continue!(packet.to_json()); + match packet.data_type { + DataType::BME280 => { + handle_error_continue!(mqtt_clone.publish(&mqtt_config.topic, &msg).await) + }, + _ => continue, + } + } + }); + } else { + lora_queue = None; + } + + let lora_handle = tokio::spawn(async move { + handle_error_exit!(lora.start(radio_config, lora_queue).await); + }); + + handle_error_exit!(lora_handle.await); } diff --git a/src/mqtt.rs b/src/mqtt.rs new file mode 100644 index 0000000..00e71e1 --- /dev/null +++ b/src/mqtt.rs @@ -0,0 +1,94 @@ +use std::{collections::VecDeque, time::Duration}; +use std::sync::Arc; +use anyhow::{Context, Result}; +use log::{error, info}; +use tokio::sync::{Mutex, Notify}; +use rumqttc::{AsyncClient, MqttOptions, QoS}; +use crate::config::MQTTConfig; + +pub struct BlockingQueue { + queue: Arc>>, + notify: Arc, + capacity: usize, +} + +impl BlockingQueue { + pub fn new(capacity: usize) -> Self { + BlockingQueue { + queue: Arc::new(Mutex::new(VecDeque::with_capacity(capacity))), + notify: Arc::new(Notify::new()), + capacity, + } + } + + pub async fn put(&self, item: T) { + let mut queue = self.queue.lock().await; + + // Wait if the queue is full + while queue.len() >= self.capacity { + drop(queue); // Release the lock before waiting + self.notify.notified().await; // Wait until notified + queue = self.queue.lock().await; // Re-acquire the lock + } + + queue.push_back(item); + self.notify.notify_one(); // Notify one waiting task + } + + pub async fn take(&self) -> T { + let mut queue = self.queue.lock().await; + + // Wait if the queue is empty + while queue.is_empty() { + drop(queue); // Release the lock before waiting + self.notify.notified().await; // Wait until notified + queue = self.queue.lock().await; // Re-acquire the lock + } + + let item = queue.pop_front().expect("Queue should not be empty"); + self.notify.notify_one(); // Notify one waiting task + item + } +} + +impl Clone for BlockingQueue { + fn clone(&self) -> Self { + BlockingQueue { + queue: Arc::clone(&self.queue), + notify: Arc::clone(&self.notify), + capacity: self.capacity, + } + } +} + +pub struct Mqtt { + client: AsyncClient, +} + +impl Mqtt { + pub async fn new(mqtt_config: MQTTConfig) -> Result { + let mut options = MqttOptions::new("RustyBeagle", mqtt_config.ip, mqtt_config.port.parse().context("Mqtt::new")?); + options.set_credentials(mqtt_config.login, mqtt_config.password); + options.set_keep_alive(Duration::from_secs(5)); + + let (client, mut event_loop) = AsyncClient::new(options, 10); + tokio::spawn(async move { + loop { + match event_loop.poll().await { + Ok(m) => info!("MQTT: {:?}", m), + Err(e) => { + eprintln!("{:?}", e); + error!("MQTT: {:?}", e); + } + + } + } + }); + Ok(Self {client}) + } + + pub async fn publish(&self, topic: &str, msg: &str) -> Result<()> { + self.client.publish(topic, QoS::AtLeastOnce, false, msg).await?; + Ok(()) + } +} diff --git a/src/packet.rs b/src/packet.rs index c2eeb58..6362995 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -1,9 +1,8 @@ use core::fmt; - use crate::conversions::*; use anyhow::{anyhow, Context, Result}; use serde::{Deserialize, Serialize}; -use std::hash::Hash; +use std::{fmt::format, hash::{DefaultHasher, Hash, Hasher}, time::SystemTime}; pub const DATA_SIZE: usize = 59; pub const META_DATA_SIZE: usize = 5; @@ -47,60 +46,6 @@ pub enum Data { Sms(String), } -trait GetData { - fn get_data(&self) -> Option<&T>; -} - -impl GetData for Data { - fn get_data(&self) -> Option<&BME280> { - if let Data::Bme280(ref inner) = *self { - Some(inner) - } else { - None - } - } -} - -impl GetData for Data { - fn get_data(&self) -> Option<&BMA400> { - if let Data::Bma400(ref inner) = *self { - Some(inner) - } else { - None - } - } -} - -impl GetData for Data { - fn get_data(&self) -> Option<&MQ2> { - if let Data::Mq2(ref inner) = *self { - Some(inner) - } else { - None - } - } -} - -impl GetData for Data { - fn get_data(&self) -> Option<&Gps> { - if let Data::Gps(ref inner) = *self { - Some(inner) - } else { - None - } - } -} - -impl GetData for Data { - fn get_data(&self) -> Option<&String> { - if let Data::Sms(ref inner) = *self { - Some(inner) - } else { - None - } - } -} - #[derive(Debug, Deserialize, Serialize, Hash)] pub struct BME280 { pub temperature: u8, @@ -280,6 +225,17 @@ impl Packet { packet.append(&mut data); Ok(packet) } + + pub fn to_json(&self) -> Result { + let time_stamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?; + match &self.data { + Data::Bme280(data) => Ok(format!(r#"{{ "time": {:?}, "temperature": {}, "humidity": {}, "pressure": {} }}"#, time_stamp, data.temperature, data.humidity, data.pressure)), + Data::Bma400(data) => Ok(format!(r#"{{ "time": {:?}, "x": {}, "y": {}, "z": {} }}"#, time_stamp, data.x, data.y, data.z)), + Data::Mq2(data) => Ok(format!(r#"{{ "time": {:?}, "gas_type": {}, "value": {} }}"#, time_stamp, data.gas_type, data.value)), + Data::Gps(data) => Ok(format!(r#"{{ "time": {:?}, "status": {}, "altitude": {}, "latitude": {}, "longitude": {} }}"#, time_stamp, data.status, data.altitude, data.latitude, data.longitude)), + Data::Sms(data) => Ok(format!(r#"{{ "time": {:?}, "text": "{}" }}"#, time_stamp, *data)), + } + } } #[cfg(test)] diff --git a/src/version_tag.rs b/src/version_tag.rs index c38b157..854183d 100644 --- a/src/version_tag.rs +++ b/src/version_tag.rs @@ -16,11 +16,25 @@ pub fn print_rusty_beagle() { } pub fn print_version_tag() { - println!(" ▄▄▄▄▄ ██ █▀▄▀█ ███ ██ ▄▄▄▄▄ █ ▄▄ ▄█ ██▄ ▄███▄ █▄▄▄▄ "); - println!(" █ ▀▄ █ █ █ █ █ █ █ █ █ █ ▀▄ █ █ ██ █ █ █▀ ▀ █ ▄▀ "); - println!("▄ ▀▀▀▀▄ █▄▄█ █ ▄ █ █ ▀ ▄ █▄▄█ ▄ ▀▀▀▀▄ █▀▀▀ ██ █ █ ██▄▄ █▀▀▌ "); - println!(" ▀▄▄▄▄▀ █ █ █ █ █ ▄▀ █ █ ▀▄▄▄▄▀ █ ▐█ █ █ █▄ ▄▀ █ █ "); - println!(" █ █ ███ █ █ ▐ ███▀ ▀███▀ █ "); - println!(" █ ▀ █ ▀ ▀ "); - println!(" ▀ ▀ "); + println!(" _______ "); + println!(" (, /' "); + println!(" /' "); + println!(" /'____ ____ ____ "); + println!(" _ /'/' ) ' _/'' _/'/' /"); + println!(" /' ` /'/' /' _/' _/' /' /' "); + println!("(_____,/' (___,/(___/'__,_/'__,(___,/(__ "); + println!(" /' "); + println!(" / /' "); + println!(" (___,/' "); + println!(" _______ "); + println!(" (, /' /' "); + println!(" /' /' "); + println!(" /'____ . . , , ____ /'. . , , O ,____ ____ "); + println!(" _ /'/' )| |/ / /' ) /' | |/ / /' /' ) /' )"); + println!(" /' ` /'/(___,/' | /| /' /(___,/' /' | /| /' /' /' /' /' /' "); + println!("(_____,/' (_________|/' |/(__(________(__ _|/' |/(__(__/' /(__(___,/(__ "); + println!(" /' "); + println!(" / /' "); + println!(" (___,/' "); + println!(); }