Skip to content

Commit

Permalink
#59 every 30 seconds, read the temperatures from a queue and write th…
Browse files Browse the repository at this point in the history
…em to the RRD. The queue is added to on every temperature upload
  • Loading branch information
tobyweston committed Feb 17, 2018
1 parent d061c42 commit 3022127
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 42 deletions.
2 changes: 1 addition & 1 deletion deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

sbt assembly && ansible -i ansible/inventory temperatures -m copy -a "src=target/scala-2.12/temperature-machine-2.1.jar dest=/home/pi/code/temperature-machine/target/scala-2.12" -u pi

#ansible -i ansible/inventory temperatures -a reboot --become -u pi
ansible -i ansible/inventory temperatures -a reboot --become -u pi
6 changes: 4 additions & 2 deletions src/main/scala/bad/robot/temperature/rrd/RrdUpdate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ case class RrdUpdate(monitored: List[Host], measurement: Measurement) {
fromTryCatchNonFatal {
val database = new RrdDb(RrdFile.file)
val sample = database.createSample()
val temperatures = UnknownValues.patch(monitored.indexOf(measurement.host) * RrdFile.MaxSensors, measurement.temperatures, measurement.temperatures.size)
val indexOf = monitored.indexOf(measurement.host)
Log.debug(s"Index of ${measurement.host.name } is $indexOf (-1 means it couldn't be found)")
val temperatures = UnknownValues.patch(indexOf * RrdFile.MaxSensors, measurement.temperatures, measurement.temperatures.size)
sample.setValues(database, measurement.time, temperatures.map(_.temperature.celsius): _*)
Log.debug(s"rrd -> ${measurement.time} ${temperatures.map(t => t.name + " " + t.temperature.celsius).mkString(", ")}")
Log.debug(s"rrd -> ${measurement.host.name} ${measurement.time} ${temperatures.map(t => t.name + " " + t.temperature.celsius).mkString(", ")}")
database.close()
}.leftMap(error => {
RrdError(messageOrStackTrace(error))
Expand Down
26 changes: 26 additions & 0 deletions src/main/scala/bad/robot/temperature/server/AllTemperatures.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package bad.robot.temperature.server

import bad.robot.temperature.Measurement

object AllTemperatures {
def apply(): AllTemperatures = new AllTemperatures()
}

class AllTemperatures {

private var measurements: List[Measurement] = List()

def put(measurement: Measurement) = {
synchronized {
measurements = measurement :: measurements
}
}

def drain(): List[Measurement] = {
synchronized {
val copy = measurements
measurements = List()
copy
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import bad.robot.temperature.rrd.Host

import scala.collection.concurrent.TrieMap

case class Temperatures(clock: Clock) {
case class CurrentTemperatures(clock: Clock) {

private val temperatures: TrieMap[Host, Measurement] = TrieMap()

Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/bad/robot/temperature/server/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import org.http4s.server.middleware.CORS
import org.http4s.server.{Server => Http4sServer}

object HttpServer {
def apply(port: Int, monitored: List[Host], temperatures: Temperatures): IO[HttpServer] = IO {
def apply(port: Int, monitored: List[Host], current: CurrentTemperatures, all: AllTemperatures): IO[HttpServer] = IO {
val server = new HttpServer(port, monitored)
server.build(temperatures).unsafeRunSync
server.build(current, all).unsafeRunSync
server
}
}
Expand All @@ -37,15 +37,15 @@ class HttpServer(port: Int, monitored: List[Host]) {
newFixedThreadPool(max(4, Runtime.getRuntime.availableProcessors), TemperatureMachineThreadFactory("http-server"))
}

private def build(temperatures: Temperatures): IO[Http4sServer[IO]] = BlazeBuilder[IO]
private def build(current: CurrentTemperatures, all: AllTemperatures): IO[Http4sServer[IO]] = BlazeBuilder[IO]
.withExecutionContext(scala.concurrent.ExecutionContext.fromExecutorService(DefaultExecutorService))
.bindHttp(port, "0.0.0.0")
.mountService(services(temperatures), "/")
.mountService(services(current, all), "/")
.start

private def services(temperatures: Temperatures): HttpService[IO] = {
private def services(current: CurrentTemperatures, all: AllTemperatures): HttpService[IO] = {
CORS(
TemperatureEndpoint(SensorReader(Host.local, SensorFile.find()), temperatures) <+>
TemperatureEndpoint(SensorReader(Host.local, SensorFile.find()), current, all) <+>
ConnectionsEndpoint(Clock.systemDefaultZone) <+>
LogEndpoint() <+>
ExportEndpoint(JsonFile.load, JsonToCsv.DefaultTimeFormatter) <+>
Expand Down
15 changes: 8 additions & 7 deletions src/main/scala/bad/robot/temperature/server/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,25 @@ object Server extends App {
} yield ()
}

def http(temperatures: Temperatures)(implicit monitored: List[Host]): IO[HttpServer] = {
def http(current: CurrentTemperatures, all: AllTemperatures)(implicit monitored: List[Host]): IO[HttpServer] = {
val port = 11900
for {
server <- HttpServer(port, monitored, temperatures)
server <- HttpServer(port, monitored, current, all)
_ <- info(s"HTTP Server started on http://${InetAddress.getLocalHost.getHostAddress}:$port")
_ <- server.awaitShutdown()
} yield server
}

def server(temperatures: Temperatures, sensors: List[SensorFile])(implicit monitored: List[Host]) = {
def server(current: CurrentTemperatures, all: AllTemperatures, sensors: List[SensorFile])(implicit monitored: List[Host]) = {
for {
_ <- info("Starting temperature-machine (server mode)...")
_ <- init(monitored)
_ <- discovery
_ <- gather(temperatures, ErrorOnTemperatureSpike(Rrd(monitored)))
_ <- gather(all, ErrorOnTemperatureSpike(Rrd(monitored)))
_ <- record(Host.local, sensors, HttpUpload(InetAddress.getLocalHost, BlazeHttpClient()))
_ <- graphing
_ <- exportJson
_ <- http(temperatures)
_ <- http(current, all)
} yield ()
}

Expand All @@ -58,8 +58,9 @@ object Server extends App {
case hosts => hosts.map(host => Host(host, utcOffset = None))
}

private val temperatures = Temperatures(Clock.systemDefaultZone)
private val current = CurrentTemperatures(Clock.systemDefaultZone)
private val all = new AllTemperatures()

findSensorsAndExecute(server(temperatures, _)(hosts)).leftMap(error => Log.error(error.message))
findSensorsAndExecute(server(current, all, _)(hosts)).leftMap(error => Log.error(error.message))

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object TemperatureEndpoint {
}


def apply(sensors: TemperatureReader, temperatures: Temperatures) = HttpService[IO] {
def apply(sensors: TemperatureReader, current: CurrentTemperatures, all: AllTemperatures) = HttpService[IO] {
// todo delete this one, it shouldn't be used
case GET -> Root / "temperature" => {
sensors.read.toHttpResponse(measurement => {
Expand All @@ -29,15 +29,15 @@ object TemperatureEndpoint {
}

case GET -> Root / "temperatures" / "average" => {
Ok(encode(temperatures.average))
Ok(encode(current.average))
}

case GET -> Root / "temperatures" => {
Ok(encode(temperatures.all))
Ok(encode(current.all))
}

case DELETE -> Root / "temperatures" => {
temperatures.clear()
current.clear()
NoContent()
}

Expand All @@ -46,7 +46,8 @@ object TemperatureEndpoint {
val result = ConnectionsEndpoint.update(measurement.host, request.headers.get(`X-Forwarded-For`))

result.toHttpResponse(_ => {
temperatures.updateWith(measurement)
current.updateWith(measurement)
all.put(measurement)
NoContent()
})
})
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/bad/robot/temperature/task/GenerateGraph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import bad.robot.temperature.rrd._
import bad.robot.temperature.rrd.Seconds.now

import scala.concurrent.duration.Duration
import bad.robot.logging._

case class GenerateGraph(period: Duration)(implicit hosts: List[Host]) extends Runnable {
def run(): Unit = {
val currentTime = now()
Log.debug(s"Generating RRD chart for last $period")
Graph.create(currentTime - period.toSeconds, currentTime, hosts)
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/bad/robot/temperature/task/IOs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import bad.robot.logging._
import bad.robot.temperature.ds18b20.{SensorFile, SensorReader}
import bad.robot.temperature.rrd.RrdFile.MaxSensors
import bad.robot.temperature.rrd.{Host, RrdFile}
import bad.robot.temperature.server.Temperatures
import bad.robot.temperature.server.AllTemperatures
import bad.robot.temperature.task.Scheduler.ScheduledExecutorServiceOps
import bad.robot.temperature.{JsonExport, TemperatureWriter, XmlExport}
import cats.effect.IO
Expand All @@ -24,7 +24,7 @@ object IOs {
}
}

def gather(temperatures: Temperatures, destination: TemperatureWriter) = {
def gather(temperatures: AllTemperatures, destination: TemperatureWriter) = {
val frequency = 30 seconds
val executor = newSingleThreadScheduledExecutor(TemperatureMachineThreadFactory("rrd-writing-thread"))
for {
Expand Down
20 changes: 13 additions & 7 deletions src/main/scala/bad/robot/temperature/task/RecordTemperature.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package bad.robot.temperature.task

import bad.robot.temperature.server.Temperatures
import bad.robot.temperature.{Error, TemperatureReader, TemperatureWriter}
import bad.robot.temperature.server.AllTemperatures
import bad.robot.temperature.{Error, Measurement, TemperatureReader, TemperatureWriter}
import org.apache.logging.log4j.Logger

case class RecordTemperature(input: TemperatureReader, output: TemperatureWriter, log: Logger) extends Runnable {
Expand All @@ -14,12 +14,18 @@ case class RecordTemperature(input: TemperatureReader, output: TemperatureWriter
}
}

// Doesn't fix the problem where multiple measurements have the same time. Really need to drain these from a queue
case class RecordTemperatures(temperatures: Temperatures, output: TemperatureWriter, log: Logger) extends Runnable {
/**
* Doesn't fix the problem where multiple measurements have the same time.
*
* Could also smash measurements taken at the same time together or filter out those that are within a second of each other
*/
case class RecordTemperatures(temperatures: AllTemperatures, output: TemperatureWriter, log: Logger) extends Runnable {
def run(): Unit = {
temperatures.all.foreach { case (host, measurement) =>
output.write(measurement).leftMap(error => log.error(s"${host.name} ${error.toString}")); ()
}
temperatures.drain().sorted(timeAscending).foreach(measurement =>
output.write(measurement).leftMap(error => log.error(s"${measurement.host} ${error.toString}"))
)
}

private def timeAscending: Ordering[Measurement] = (x: Measurement, y: Measurement) => x.time.compareTo(y.time)
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ import cats.effect.IO
class HttpServerTest extends Specification {

"When the Http server has been started" >> {
val temperatures = Temperatures(Clock.systemDefaultZone)
val server = HttpServer(8080, List(Host("example", None)), temperatures).unsafeRunSync
val current = CurrentTemperatures(Clock.systemDefaultZone)
val all = AllTemperatures()
val server = HttpServer(8080, List(Host("example", None)), current, all).unsafeRunSync
val client = Http1Client[IO](config = defaultConfig.copy(idleTimeout = 30 minutes, responseHeaderTimeout = 30 minutes)).unsafeRunSync()

// todo wait for server to startup, not sure how.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class TemperatureEndpointTest extends Specification {
"Averages a single temperature" >> {
val request = Request[IO](GET, Uri.uri("/temperature"))
val reader = stubReader(\/-(List(SensorReading("28-0002343fd", Temperature(56.34)))))
val service = TemperatureEndpoint(reader, Temperatures(Clock.systemDefaultZone))
val service = TemperatureEndpoint(reader, CurrentTemperatures(Clock.systemDefaultZone), AllTemperatures())
val response = service.orNotFound.run(request).unsafeRunSync

response.status must_== Ok
Expand All @@ -39,7 +39,7 @@ class TemperatureEndpointTest extends Specification {
SensorReading("28-0000d34c3", Temperature(25.344)),
SensorReading("28-0000d34c3", Temperature(23.364)),
SensorReading("28-0000d34c3", Temperature(21.213))
))), Temperatures(Clock.systemDefaultZone))
))), CurrentTemperatures(Clock.systemDefaultZone), AllTemperatures())
val response = service.orNotFound.run(request).unsafeRunSync

response.status must_== Ok
Expand All @@ -48,15 +48,15 @@ class TemperatureEndpointTest extends Specification {

"General Error reading temperatures" >> {
val request = Request[IO](GET, Uri.uri("/temperature"))
val service = TemperatureEndpoint(stubReader(-\/(SensorError("An example error"))), Temperatures(Clock.systemDefaultZone))
val service = TemperatureEndpoint(stubReader(-\/(SensorError("An example error"))), CurrentTemperatures(Clock.systemDefaultZone), AllTemperatures())
val response = service.orNotFound.run(request).unsafeRunSync

response.status must_== InternalServerError
response.as[String].unsafeRunSync must_== "An example error"
}

"Put some temperature data" >> {
val service = TemperatureEndpoint(stubReader(\/-(List())), Temperatures(Clock.systemDefaultZone))
val service = TemperatureEndpoint(stubReader(\/-(List())), CurrentTemperatures(Clock.systemDefaultZone), AllTemperatures())
val measurement = """{
| "host" : {
| "name" : "localhost",
Expand All @@ -77,7 +77,7 @@ class TemperatureEndpointTest extends Specification {
}

"Bad json when writing sensor data" >> {
val service = TemperatureEndpoint(stubReader(\/-(List())), Temperatures(Clock.systemDefaultZone))
val service = TemperatureEndpoint(stubReader(\/-(List())), CurrentTemperatures(Clock.systemDefaultZone), AllTemperatures())
val request: Request[IO] = Put("bad json")
val response = service.orNotFound.run(request).unsafeRunSync
response must haveStatus(org.http4s.Status.BadRequest)
Expand Down Expand Up @@ -131,7 +131,7 @@ class TemperatureEndpointTest extends Specification {
|}""".stripMargin


val service = TemperatureEndpoint(stubReader(\/-(List())), Temperatures(Clock.systemDefaultZone))
val service = TemperatureEndpoint(stubReader(\/-(List())), CurrentTemperatures(Clock.systemDefaultZone), AllTemperatures())
service.orNotFound.run(Request[IO](DELETE, Uri.uri("/temperatures"))).unsafeRunSync
service.orNotFound.run(Put(measurement1)).unsafeRunSync
service.orNotFound.run(Put(measurement2)).unsafeRunSync
Expand Down Expand Up @@ -238,7 +238,7 @@ class TemperatureEndpointTest extends Specification {
|}""".stripMargin


val service = TemperatureEndpoint(stubReader(\/-(List())), Temperatures(Clock.systemDefaultZone))
val service = TemperatureEndpoint(stubReader(\/-(List())), CurrentTemperatures(Clock.systemDefaultZone), AllTemperatures())
service.orNotFound.run(Request[IO](DELETE, Uri.uri("/temperatures"))).unsafeRunSync
service.orNotFound.run(Put(measurement1)).unsafeRunSync
service.orNotFound.run(Put(measurement2)).unsafeRunSync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TemperaturesTest extends Specification {
SensorReading("28-000003dd3433", Temperature(22.8))
))

val temperatures = Temperatures(fixedClock(Instant.ofEpochSecond(300)))
val temperatures = CurrentTemperatures(fixedClock(Instant.ofEpochSecond(300)))
temperatures.updateWith(measurement1)
temperatures.updateWith(measurement2)

Expand All @@ -42,7 +42,7 @@ class TemperaturesTest extends Specification {
SensorReading("28-000003dd3433", Temperature(22.8))
))

val temperatures = Temperatures(fixedClock(Instant.ofEpochSecond(300)))
val temperatures = CurrentTemperatures(fixedClock(Instant.ofEpochSecond(300)))
temperatures.updateWith(measurement1)
temperatures.updateWith(measurement2)

Expand Down

0 comments on commit 3022127

Please sign in to comment.