Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Converts scala functions Span.timestamp, duration to vals #817

Merged
merged 1 commit into from
Nov 6, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import anorm.SqlParser._
import anorm._
import com.twitter.finagle.stats.{DefaultStatsReceiver, StatsReceiver}
import com.twitter.util._
import com.twitter.zipkin.adjuster.CorrectForClockSkew
import com.twitter.zipkin.adjuster.{ApplyTimestampAndDuration, CorrectForClockSkew}
import com.twitter.zipkin.common._
import com.twitter.zipkin.storage.anormdb.AnormThreads._
import com.twitter.zipkin.storage.anormdb.DB.byteArrayToStatement
Expand All @@ -33,7 +33,10 @@ class AnormSpanStore(val db: DB,
val stats: StatsReceiver = DefaultStatsReceiver.scope("AnormSpanStore")
) extends SpanStore with CollectAnnotationQueries with DBPool {

override def apply(spans: Seq[Span]) = Future.join(spans.map(storeSpan))
override def apply(spans: Seq[Span]) = Future.join(spans
.map(s => s.copy(annotations = s.annotations.sorted))
.map(ApplyTimestampAndDuration.apply)
.map(storeSpan))

private [this] def storeSpan(span: Span): Future[Unit] = inNewThread {
implicit val (conn, borrowTime) = borrowConn()
Expand Down Expand Up @@ -166,13 +169,14 @@ class AnormSpanStore(val db: DB,
val annotationType = AnnotationType.fromInt(binAnno.annotationTypeValue)
BinaryAnnotation(binAnno.key, value, annotationType, host)
}
Span(span.traceId, span.spanName, span.spanId, span.parentId, spanAnnos, spanBinAnnos, span.debug)
Span(span.traceId, span.spanName, span.spanId, span.parentId, None, None, spanAnnos, spanBinAnnos, span.debug)
}
}
// Redundant sort as List.groupBy loses order of values
results.groupBy(_.traceId)
.values.toList
.map(CorrectForClockSkew)
.map(ApplyTimestampAndDuration)
.sortBy(_.head) // sort traces by the first span
} finally {
returnConn(conn, borrowTime, "getSpansByTraceIds")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.twitter.zipkin.storage.cassandra
import com.twitter.conversions.time._
import com.twitter.finagle.stats.{DefaultStatsReceiver, StatsReceiver}
import com.twitter.util.{Future, Duration}
import com.twitter.zipkin.adjuster.{CorrectForClockSkew, MergeById}
import com.twitter.zipkin.adjuster.{ApplyTimestampAndDuration, CorrectForClockSkew, MergeById}
import com.twitter.zipkin.common.Span
import com.twitter.zipkin.conversions.thrift._
import com.twitter.zipkin.thriftscala.{Span => ThriftSpan}
Expand Down Expand Up @@ -188,6 +188,7 @@ abstract class CassandraSpanStore(
traceIds.flatMap(traceId => spans.get(traceId))
.map(MergeById)
.map(CorrectForClockSkew)
.map(ApplyTimestampAndDuration)
.sortBy(_.head) // CQL doesn't allow order by with an "in" query
}
}
Expand All @@ -201,7 +202,8 @@ abstract class CassandraSpanStore(
SpansStoredCounter.incr(spans.size)

Future.join(
spans map { span =>
spans.map(s => s.copy(annotations = s.annotations.sorted))
.map(ApplyTimestampAndDuration.apply).map { span =>
SpansIndexedCounter.incr()

Future.join(
Expand All @@ -210,7 +212,7 @@ abstract class CassandraSpanStore(
span.traceId,
span.timestamp.getOrElse(0L),
createSpanColumnName(span),
spanCodec.encode(span.copy(annotations = span.annotations.sorted).toThrift),
spanCodec.encode(span.toThrift),
spanTtl.inSeconds)),
indexServiceName(span),
indexSpanNameByService(span),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ import org.scalatest.{FunSuite, Matchers}
class SamplerFilterSpec extends FunSuite with Matchers with MockitoSugar {

test("let the span pass if debug flag is set") {
val span = Span(12345, "methodcall", 666, None, List(), Seq(), Some(true))
val span = Span(12345, "methodcall", 666, debug = Some(true))
val samplerProcessor = new SamplerFilter(NullGlobalSampler)

samplerProcessor(Seq(span)) should be (Seq(span))
}

test("let the span pass if debug flag false and sampler says yes") {
val span = Span(12345, "methodcall", 666, None, List(), Seq(), Some(false))
val span = Span(12345, "methodcall", 666, debug = Some(false))
val samplerProcessor = new SamplerFilter(EverythingGlobalSampler)

samplerProcessor(Seq(span)) should be (Seq(span))
}

test("not let the span pass if debug flag false and sampler says no") {
val span = Span(12345, "methodcall", 666, None, List(), Seq(), Some(false))
val span = Span(12345, "methodcall", 666, debug = Some(false))
val samplerProcessor = new SamplerFilter(NullGlobalSampler)

samplerProcessor(Seq(span)) should be (empty)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.twitter.zipkin.adjuster

import com.twitter.zipkin.common._

/**
* This applies timestamp and duration to spans, based on interpretation of
* annotations. Spans who already have timestamp and duration set and left
* alone.
*
* After application, spans without a timestamp are filtered out, as they are
* not possible to present on a timeline. The only scenario where this is
* possible is when instrumentation sends binary annotations ahead of the span
* start event, or when a span's start even was lost. Considering this is error
* -case or transient, there's no option to control this behavior.
*/
object ApplyTimestampAndDuration extends ((List[Span]) => List[Span]) {

override def apply(spans: List[Span]): List[Span] = spans.map { span =>
if (span.timestamp.isDefined && span.duration.isDefined) span else apply(span)
}.filter(_.timestamp.nonEmpty).sorted

def apply(span: Span) = {
val sorted = span.annotations.sorted
val firstOption = sorted.headOption.map(_.timestamp)
val lastOption = sorted.lastOption.map(_.timestamp)
span.copy(
timestamp = span.timestamp.orElse(firstOption),
duration = span.duration.orElse {
for (first <- firstOption; last <- lastOption; if (first != last))
yield last - first
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,21 @@ object CorrectForClockSkew extends ((List[Span]) => List[Span]) {
Constants.LocalhostLoopBackIP == ep.ipv4

val span = spanTree.span
val annotations = span.annotations map { a =>
a.host match {
val annotations: List[Annotation] = span.annotations.map(a => a.host match {
case Some(ep) if isHost(ep, a.value) => a.copy(timestamp = a.timestamp - clockSkew.skew)
case _ => a
}
}
).sorted

new SpanTreeEntry(span.copy(annotations = annotations.sorted), spanTree.children)
// reset timestamp and duration as if there's skew, these will change.
val firstOption = annotations.headOption.map(_.timestamp)
val lastOption = annotations.lastOption.map(_.timestamp)
val duration = for (first <- firstOption; last <- lastOption; if (first != last))
yield last - first
new SpanTreeEntry(span.copy(
timestamp = firstOption,
duration = duration,
annotations = annotations.sorted), spanTree.children)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ import scala.collection.mutable
/**
* Merge all the spans with the same id. This is used by span stores who store
* partial spans and need them collated at query time.
*
* After merge, spans without a timestamp are filtered out, as they are
* not possible to present on a timeline. The only scenario where this is
* possible is when instrumentation sends binary annotations ahead of the span
* start event, or when a span's start even was lost. Considering this is error
* -case or transient, there's no option to control this behavior.
*/
object MergeById extends ((Seq[Span]) => List[Span]) {

Expand All @@ -43,8 +37,6 @@ object MergeById extends ((Seq[Span]) => List[Span]) {
case None => spanMap.put(s.id, s)
}
})
spanMap.values
.filter(_.timestamp.nonEmpty)
.toList.sorted
spanMap.values.toList.sorted
}
}
26 changes: 12 additions & 14 deletions zipkin-common/src/main/scala/com/twitter/zipkin/common/Span.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.twitter.zipkin.common

import com.twitter.zipkin.Constants
import com.twitter.zipkin.adjuster.ApplyTimestampAndDuration
import com.twitter.zipkin.util.Util._

/**
Expand All @@ -32,6 +33,8 @@ import com.twitter.zipkin.util.Util._
* @param name name of span, can be rpc method name for example, in lowercase.
* @param id random long that identifies this span
* @param parentId reference to the parent span in the trace tree
* @param timestamp epoch microseconds of the start of this span. None when a partial span.
* @param duration microseconds comprising the critical path, if known.
* @param annotations annotations, containing a timestamp and some value. both user generated and
* some fixed ones from the tracing framework. Sorted ascending by timestamp
* @param binaryAnnotations binary annotations, can contain more detailed information such as
Expand All @@ -43,25 +46,14 @@ case class Span(
name: String,
id: Long,
parentId: Option[Long] = None,
timestamp: Option[Long] = None,
duration: Option[Long] = None,
annotations: List[Annotation] = List.empty,
binaryAnnotations: Seq[BinaryAnnotation] = Seq.empty,
debug: Option[Boolean] = None) extends Ordered[Span] {

checkArgument(name.toLowerCase == name, s"name must be lowercase: $name")

lazy val timestamp: Option[Long] = annotations.headOption.map(_.timestamp)

/**
* Duration in microseconds.
*
* Absent when this is span has only binary annotations or only a single
* annotation. This is possible when a span isn't complete, or messages that
* complete it were lost.
*/
def duration: Option[Long] =
for (first <- annotations.headOption; last <- annotations.lastOption; if (first != last))
yield last.timestamp - first.timestamp

override def compare(that: Span) =
java.lang.Long.compare(timestamp.getOrElse(0L), that.timestamp.getOrElse(0L))

Expand Down Expand Up @@ -97,11 +89,17 @@ case class Span(
case _ => name
}

val selectedTimestamp = Seq(timestamp, mergeFrom.timestamp).flatten.reduceOption(_ min _)
val selectedDuration = Trace.duration(List(this, mergeFrom))
.orElse(duration).orElse(mergeFrom.duration)

new Span(
traceId,
selectedName,
id,
parentId,
selectedTimestamp,
selectedDuration,
(annotations ++ mergeFrom.annotations).sorted,
binaryAnnotations ++ mergeFrom.binaryAnnotations,
if (debug.getOrElse(false) | mergeFrom.debug.getOrElse(false)) Some(true) else None
Expand All @@ -119,4 +117,4 @@ case class Span(
*/
def serverSideAnnotations: Seq[Annotation] =
annotations.filter(a => Constants.CoreServer.contains(a.value))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ object JsonSpan extends (Span => JsonSpan) {
s.name.toLowerCase,
id(s.id),
s.parentId.map(id(_)),
None,
None,
/** If deserialized with jackson, these could be null, as it doesn't look at default values. */
if (s.annotations == null) List.empty else s.annotations.map(JsonAnnotation.invert).sorted,
if (s.binaryAnnotations == null) Seq.empty else s.binaryAnnotations.map(JsonBinaryAnnotation.invert),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import java.nio.ByteBuffer

import com.twitter.util.FuturePools._
import com.twitter.util.{Closable, Future}
import com.twitter.zipkin.adjuster.{CorrectForClockSkew, MergeById}
import com.twitter.zipkin.adjuster.{ApplyTimestampAndDuration, CorrectForClockSkew, MergeById}
import com.twitter.zipkin.common.Span

abstract class SpanStore extends java.io.Closeable {
Expand Down Expand Up @@ -99,7 +99,9 @@ class InMemorySpanStore extends SpanStore with CollectAnnotationQueries {
override def close() = {}

override def apply(newSpans: Seq[Span]): Future[Unit] = call {
spans ++= newSpans.map(s => s.copy(annotations = s.annotations.sorted))
spans ++= newSpans
.map(s => s.copy(annotations = s.annotations.sorted))
.map(ApplyTimestampAndDuration.apply)
}.unit

override def getTracesByIds(traceIds: Seq[Long]): Future[Seq[List[Span]]] = call {
Expand All @@ -108,7 +110,8 @@ class InMemorySpanStore extends SpanStore with CollectAnnotationQueries {
.values.filter(!_.isEmpty).toList
.map(MergeById)
.map(CorrectForClockSkew)
.sortBy(_.head.timestamp)
.map(ApplyTimestampAndDuration)
.sortBy(_.head)
}

override def getTraceIdsByName(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.twitter.zipkin.adjuster

import com.twitter.zipkin.common._
import org.scalatest.FunSuite

class ApplyTimestampAndDurationTest extends FunSuite {

test("noop when no annotations") {
val span = Span(1, "n", 2, None, None, None)

val adjusted = ApplyTimestampAndDuration(span)
assert(adjusted.timestamp === None)
assert(adjusted.duration === None)
}

test("duration is difference between annotation timestamps") {
val span = Span(12345, "methodcall", 666, None, None, None, List(
Annotation(1, "value1", Some(Endpoint(1, 2, "service"))),
Annotation(2, "value2", Some(Endpoint(3, 4, "service"))),
Annotation(3, "value3", Some(Endpoint(5, 6, "service")))
))

val adjusted = ApplyTimestampAndDuration(span)
assert(adjusted.timestamp === Some(1))
assert(adjusted.duration === Some(2))
}

test("noop when duration already set") {
val span = Span(12345, "methodcall", 666, None, Some(83L), Some(11L), List(
Annotation(10, "value1", Some(Endpoint(1, 2, "service")))
))

val adjusted = ApplyTimestampAndDuration(span)
assert(adjusted === span)
}

test("duration isn't set when only one annotation") {
val span = Span(1, "n", 2, None, None, None, annotations = List(
Annotation(1, "value1", Some(Endpoint(1, 2, "service")))
))

val adjusted = ApplyTimestampAndDuration(span)
assert(adjusted.timestamp === Some(1))
assert(adjusted.duration === None)
}

test("duration isn't set when only same timestamps") {
val span = Span(1, "n", 2, None, None, None, annotations = List(
Annotation(1, "value1", Some(Endpoint(1, 2, "service"))),
Annotation(1, "value2", Some(Endpoint(1, 2, "service")))
))

val adjusted = ApplyTimestampAndDuration(span)
assert(adjusted.timestamp === Some(1))
assert(adjusted.duration === None)
}

/** Missing timestamp means the span cannot be placed on a timeline */
test("filters spans without a timestamp") {
assert(ApplyTimestampAndDuration(List(Span(12345, "methodcall2", 2))) == List())
}
}
Loading