Skip to content

Commit

Permalink
Beam 2.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
labianchin committed Aug 3, 2018
1 parent 385ac46 commit 54807c9
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import sbt._
import sbt.Keys._
import sbtrelease.ReleasePlugin.autoImport.ReleaseTransformations._

val beamVersion = "2.4.0"
val beamVersion = "2.5.0"
val autoValueVersion = "1.5.3"
val slf4jVersion = "1.7.25"

Expand Down
14 changes: 7 additions & 7 deletions dbeam-core/src/main/scala/com/spotify/dbeam/MetricsHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,27 @@ object MetricsHelper {
def getMetrics(result: PipelineResult): Map[String, Long] = {
val metrics: MetricQueryResults = result.metrics().queryMetrics(MetricsFilter.builder().build())

val gauges = metricsAtSteps(metrics.gauges().asScala)
.map{ case (k: MetricName, v: Map[String, MetricValue[GaugeResult]]) => (k.name(),
val gauges = metricsAtSteps(metrics.getGauges.asScala)
.map{ case (k: MetricName, v: Map[String, MetricValue[GaugeResult]]) => (k.getName(),
v.values.map(_.committed.getOrElse(GaugeResult.empty())).reduce(
(x: GaugeResult, y: GaugeResult) =>
if (x.timestamp() isAfter y.timestamp()) x else y).value()
if (x.getTimestamp isAfter y.getTimestamp) x else y).getValue
)}
val counters = metricsAtSteps(
metrics.counters().asScala.asInstanceOf[Iterable[MetricResult[Long]]])
metrics.getCounters.asScala.asInstanceOf[Iterable[MetricResult[Long]]])
.map{
case (k: MetricName, v: Map[String, MetricValue[Long]]) =>
(k.name(), reduceMetricValues(v))}
(k.getName(), reduceMetricValues(v))}
(gauges.toSeq ++ counters.toSeq).toMap
}

private def metricsAtSteps[T](results: Iterable[MetricResult[T]])
: Map[MetricName, Map[String, MetricValue[T]]] =
results
.groupBy(_.name())
.groupBy(_.getName())
.mapValues { xs =>
val m: Map[String, MetricValue[T]] = xs.map { r =>
r.step() -> MetricValue(r.attempted(), Try(r.committed()).toOption)
r.getStep -> MetricValue(r.getAttempted, Try(r.getCommitted).toOption)
} (scala.collection.breakOut)
m
}
Expand Down

0 comments on commit 54807c9

Please sign in to comment.