Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduces LOCAL_COMPONENT("lc") binary annotation and example use-case #821

Merged
merged 1 commit into from
Nov 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ConfigSpec extends FunSuite with Matchers {
test("validate collector configs") {
val configSource = Seq(
"/collector-dev.scala",
"/collector-mysql.scala",
"/collector-cassandra.scala",
"/collector-redis.scala"
) map { r =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ object Constants {
val ServerAddr: String = "sa"
val WireSend: String = "ws"
val WireRecv: String = "wr"
val LocalComponent: String = "lc"

val CoreClient: Set[String] = Set(ClientSend, ClientSendFragment, ClientRecv, ClientRecvFragment)
val CoreServer: Set[String] = Set(ServerRecv, ServerRecvFragment, ServerSend, ServerSendFragment)
val CoreAddress: Set[String] = Set(ClientAddr, ServerAddr)
val CoreWire: Set[String] = Set(WireSend, WireRecv)
val CoreLocal: Set[String] = Set(LocalComponent)

val CoreAnnotations: Set[String] = CoreClient ++ CoreServer ++ CoreWire
val CoreAnnotations: Set[String] = CoreClient ++ CoreServer ++ CoreWire ++ CoreLocal

val CoreAnnotationNames: Map[String, String] = Map(
ClientSend -> "Client Send",
Expand All @@ -52,7 +54,8 @@ object Constants {
ClientAddr -> "Client Address",
ServerAddr -> "Server Address",
WireSend -> "Wire Send",
WireRecv -> "Wire Receive"
WireRecv -> "Wire Receive",
LocalComponent -> "Local Component"
)

/* 127.0.0.1 */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,7 @@ abstract class SpanStoreSpec extends JUnitSuite with Matchers {

/** Shows that duration queries go against the root span, not the child */
@Test def getTraces_duration() {
// Foreshadowing of local spans https://github.com/openzipkin/zipkin/issues/808
val archiver = List(binaryAnnotation("lc", "archiver"))
val archiver = List(binaryAnnotation(Constants.LocalComponent, "archiver"))
val targz = Span(1L, "targz", 1L, None, Some(100L), Some(200L), binaryAnnotations = archiver)
val tar = Span(1L, "tar", 2L, Some(1L), Some(100L), Some(150L), binaryAnnotations = archiver)
val gz = Span(1L, "gz", 3L, Some(1L), Some(250L), Some(50L), binaryAnnotations = archiver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,35 @@
*/
package com.twitter.zipkin.builder

import ch.qos.logback.classic.{Logger, Level}
import com.twitter.conversions.time._
import com.twitter.finagle.ListeningServer
import com.twitter.finagle.stats.DefaultStatsReceiver
import com.twitter.finagle.tracing.{DefaultTracer, NullTracer}
import com.twitter.finagle.zipkin.thrift.{SpanStoreZipkinTracer, RawZipkinTracer}
import com.twitter.ostrich.admin.RuntimeEnvironment
import com.twitter.zipkin.query.ZipkinQueryServer
import com.twitter.finagle.zipkin.thrift.{RawZipkinTracer, SpanStoreZipkinTracer}
import com.twitter.zipkin.query.{BootstrapTrace, ZipkinQueryServer}
import com.twitter.zipkin.storage.{DependencyStore, NullDependencyStore, SpanStore}
import org.slf4j.LoggerFactory

case class QueryServiceBuilder(override val defaultFinatraHttpPort: String = "0.0.0.0:9411",
override val defaultHttpPort: Int = 9901,
logLevel: String = "INFO",
spanStore: SpanStore,
dependencies: DependencyStore = new NullDependencyStore,
override val defaultHttpServerName: String = "zipkin-query"
) extends ZipkinQueryServer(spanStore, dependencies) with
Builder[RuntimeEnvironment => ListeningServer] {
) extends ZipkinQueryServer(spanStore, dependencies) {

override def apply() = (runtime: RuntimeEnvironment) => {
LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME)
.asInstanceOf[Logger].setLevel(Level.toLevel(logLevel))

/** If the span transport is set, trace accordingly, or disable tracing */
/** If the span transport is set, trace accordingly, or disable tracing. */
premain {
DefaultTracer.self = sys.env.get("TRANSPORT_TYPE") match {
case Some("scribe") => RawZipkinTracer(sys.env.get("SCRIBE_HOST").getOrElse("localhost"), sys.env.get("SCRIBE_PORT").getOrElse("1463").toInt)
case Some("http") => new SpanStoreZipkinTracer(spanStore, DefaultStatsReceiver.get)
case Some("http") => new SpanStoreZipkinTracer(spanStore, statsReceiver)
case _ => NullTracer
}
}

override def warmup() {
super.warmup()
BootstrapTrace.record("warmup")
}

val defaultLookback = sys.env.get("QUERY_LOOKBACK").getOrElse(7.days.inMicroseconds.toString)
nonExitingMain(Array(
"-local.doc.root", "/",
"-zipkin.queryService.lookback", defaultLookback
))
adminHttpServer
override def postWarmup() {
super.postWarmup()
BootstrapTrace.complete()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.twitter.zipkin.query

import com.twitter.finagle.tracing.{Annotation, DefaultTracer, Record, Trace}
import com.twitter.util.Time
import com.twitter.zipkin.Constants
import scala.collection.mutable.ArrayBuffer

/** This decouples bootstrap tracing from trace initialization. */
object BootstrapTrace {
private val id = Trace.nextId
private val records = ArrayBuffer[Record]()

record(Annotation.ServiceName("zipkin-query"))
record(Annotation.BinaryAnnotation(Constants.LocalComponent, "finatra"))
record(Annotation.Rpc("bootstrap"))
record("init")

private def record(ann: Annotation): Unit = records += Record(id, Time.now, ann, None)

def record(message: String): Unit = record(Annotation.Message(message))

/** records duration and flushes the trace */
def complete() = {
// TODO: DeadlineSpanMap needs an update to mark complete based on Span.duration
record(Annotation.ClientRecv())
records.foreach(DefaultTracer.self.record)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,34 @@
*/
package com.twitter.zipkin.query

import ch.qos.logback.classic.{Level, Logger}
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.{Files, Resources}
import com.twitter.finagle.ListeningServer
import com.twitter.logging.Logger
import com.twitter.ostrich.admin.{RuntimeEnvironment, Service, ServiceTracker}
import com.twitter.util.{Await, Eval}
import com.twitter.conversions.time._
import com.twitter.ostrich.admin.RuntimeEnvironment
import com.twitter.util.Eval
import com.twitter.zipkin.BuildProperties
import com.twitter.zipkin.builder.Builder
import com.twitter.zipkin.builder.QueryServiceBuilder
import org.slf4j.LoggerFactory

object Main {
val log = Logger.get(getClass.getName)

def main(args: Array[String]) {
log.info("Loading configuration")
def main(args: Array[String]) = {
val runtime = RuntimeEnvironment(BuildProperties, args)

// Fallback to bundled config resources, if there's no file at the path specified as -f
val source = if (runtime.configFile.exists()) Files.toString(runtime.configFile, UTF_8)
else Resources.toString(getClass.getResource(runtime.configFile.toString), UTF_8)

val builder = (new Eval).apply[Builder[RuntimeEnvironment => ListeningServer]](source)
try {
val query = builder.apply().apply(runtime)
Await.ready(query)
ServiceTracker.register(new Service() {
override def shutdown() = Await.ready(query.close())
val query = (new Eval).apply[QueryServiceBuilder](source)
LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME)
.asInstanceOf[Logger].setLevel(Level.toLevel(query.logLevel))

override def start() = {}
})
} catch {
case e: Exception =>
e.printStackTrace()
log.error(e, "Unexpected exception: %s", e.getMessage)
System.exit(0)
}
// Note: this is blocking, so nothing after this will be called.
val defaultLookback = sys.env.get("QUERY_LOOKBACK").getOrElse(7.days.inMicroseconds.toString)
query.nonExitingMain(Array(
"-local.doc.root", "/",
"-zipkin.queryService.lookback", defaultLookback
))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ package com.twitter.zipkin.config

import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Resources
import com.twitter.finagle.ListeningServer
import com.twitter.ostrich.admin.RuntimeEnvironment
import com.twitter.util.Eval
import com.twitter.zipkin.builder.Builder
import com.twitter.zipkin.builder.QueryServiceBuilder
import org.scalatest.{FunSuite, Matchers}

class ConfigSpec extends FunSuite with Matchers {
Expand All @@ -30,16 +28,16 @@ class ConfigSpec extends FunSuite with Matchers {
test("validate query configs") {
val configSource = Seq(
"/query-dev.scala",
"/query-mysql.scala",
"/query-cassandra.scala",
"/query-redis.scala"
) map { r =>
Resources.toString(getClass.getResource(r), UTF_8)
}

for (source <- configSource) {
val config = eval[Builder[RuntimeEnvironment => ListeningServer]](source)
config should not be(Nil)
config.apply()
val query = eval[QueryServiceBuilder](source)
query should not be(Nil)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ class ZipkinQueryServer(spanStore: SpanStore, dependencyStore: DependencyStore)

override def httpExternalSocketAddress = Option(httpServer).map(_.boundAddress)

override def waitForServer() {
Await.ready(httpServer)
}
override def waitForServer() = Await.ready(httpServer)

}
28 changes: 28 additions & 0 deletions zipkin-thrift/src/main/thrift/com/twitter/zipkin/zipkinCore.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,34 @@ const string SERVER_SEND_FRAGMENT = "ssf"
*/
const string SERVER_RECV_FRAGMENT = "srf"

#***** BinaryAnnotation.key ******
/**
* The value of "lc" is the component or namespace of a local span.
*
* BinaryAnnotation.host adds service context needed to support queries.
*
* Local Component("lc") supports three key features: flagging, query by
* service and filtering Span.name by namespace.
*
* While structurally the same, local spans are fundamentally different than
* RPC spans in how they should be interpreted. For example, zipkin v1 tools
* center on RPC latency and service graphs. Root local-spans are neither
* indicative of critical path RPC latency, nor have impact on the shape of a
* service graph. By flagging with "lc", tools can special-case local spans.
*
* Zipkin v1 Spans are unqueryable unless they can be indexed by service name.
* The only path to a service name is by (Binary)?Annotation.host.serviceName.
* By logging "lc", a local span can be queried even if no other annotations
* are logged.
*
* The value of "lc" is the namespace of Span.name. For example, it might be
* "finatra2", for a span named "bootstrap". "lc" allows you to resolves
* conflicts for the same Span.name, for example "finatra/bootstrap" vs
* "finch/bootstrap". Using local component, you'd search for spans named
* "bootstrap" where "lc=finch"
*/
const string LOCAL_COMPONENT = "lc"

#***** BinaryAnnotation.key where value = [1] and annotation_type = BOOL ******
/**
* Indicates a client address ("ca") in a span. Most likely, there's only one.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.twitter.zipkin.web

import com.twitter.finagle.tracing.{Annotation, DefaultTracer, Record, Trace}
import com.twitter.util.Time
import com.twitter.zipkin.Constants

import scala.collection.mutable.ArrayBuffer

/** This decouples bootstrap tracing from trace initialization. */
object BootstrapTrace {
private val id = Trace.nextId
private val records = ArrayBuffer[Record]()

record(Annotation.ServiceName("zipkin-web"))
record(Annotation.BinaryAnnotation(Constants.LocalComponent, "twitter-server"))
record(Annotation.Rpc("bootstrap"))
record("init")

private def record(ann: Annotation): Unit = records += Record(id, Time.now, ann, None)

def record(message: String): Unit = record(Annotation.Message(message))

/** records duration and flushes the trace */
def complete() = {
// TODO: DeadlineSpanMap needs an update to mark complete based on Span.duration
record(Annotation.ClientRecv())
records.foreach(DefaultTracer.self.record)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,8 @@ class Handlers(mustacheGenerator: ZipkinMustache, queryExtractor: QueryExtractor
val key = ZConstants.CoreAnnotationNames.get(ann.key).get
val value = ann.host.map { e => s"${e.getHostAddress}:${e.getUnsignedPort}" }.get
JsonBinaryAnnotation(key, value, None, ann.host.map(JsonEndpoint))
case ann if ZConstants.CoreAnnotationNames.contains(ann.key) =>
JsonBinaryAnnotation(ann.copy(key = ZConstants.CoreAnnotationNames.get(ann.key).get))
case ann => JsonBinaryAnnotation(ann)
}

Expand Down
18 changes: 14 additions & 4 deletions zipkin-web/src/main/scala/com/twitter/zipkin/web/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package com.twitter.zipkin.web

import ch.qos.logback.classic.{Logger, Level}
import java.net.InetSocketAddress

import ch.qos.logback.classic.{Level, Logger}
import com.twitter.app.App
import com.twitter.finagle._
import com.twitter.finagle.http.{HttpMuxer, Request, Response}
Expand All @@ -29,7 +31,6 @@ import com.twitter.server.TwitterServer
import com.twitter.util.Await
import com.twitter.zipkin.json.ZipkinJson
import com.twitter.zipkin.web.mustache.ZipkinMustache
import java.net.InetSocketAddress
import org.slf4j.LoggerFactory

trait ZipkinWebFactory { self: App =>
Expand Down Expand Up @@ -113,20 +114,29 @@ trait ZipkinWebFactory { self: App =>
}

object Main extends TwitterServer with ZipkinWebFactory {
def main() {
// If the span transport is set, trace accordingly, or disable tracing

/** If the span transport is set, trace accordingly, or disable tracing. */
premain {
DefaultTracer.self = sys.env.get("TRANSPORT_TYPE") match {
case Some("scribe") => RawZipkinTracer(sys.env.get("SCRIBE_HOST").getOrElse("localhost"), sys.env.get("SCRIBE_PORT").getOrElse("1463").toInt)
case Some("http") => new HttpZipkinTracer(queryDest(), DefaultStatsReceiver.get)
case _ => NullTracer
}
}

def main() = {
BootstrapTrace.record("main")

// Httpx.server will trace all paths. We don't care about static assets, so need to customize
val server = Http.Server(StackServer.newStack
.replace(FilteredHttpEntrypointTraceInitializer.role, FilteredHttpEntrypointTraceInitializer))
.configured(param.Label("zipkin-web"))
.serve(webServerPort(), newWebServer(stats = statsReceiver.scope("zipkin-web")))
onExit { server.close() }

BootstrapTrace.complete()

// Note: this is blocking, so nothing after this will be called.
Await.ready(server)
}
}