Skip to content

Commit

Permalink
Merge pull request #44 from nlnwa/tracing
Browse files Browse the repository at this point in the history
Added tracing. Fixed threading issues.
  • Loading branch information
Langvann authored Apr 28, 2021
2 parents 4efbce4 + 3965d4d commit 0794c2c
Show file tree
Hide file tree
Showing 36 changed files with 1,193 additions and 958 deletions.
59 changes: 40 additions & 19 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,16 @@
<docker.tag>${project.version}</docker.tag>

<veidemann.api.version>1.0.0-beta23</veidemann.api.version>
<veidemann.commons.version>0.5.0</veidemann.commons.version>
<veidemann.rethinkdbadapter.version>0.7.0</veidemann.rethinkdbadapter.version>
<veidemann.commons.version>v0.6.0</veidemann.commons.version>
<veidemann.rethinkdbadapter.version>0.8.0</veidemann.rethinkdbadapter.version>

<com.github.netflix.concurrency-limits.version>0.3.7</com.github.netflix.concurrency-limits.version>
<log4j.version>2.13.3</log4j.version>
<io.prometheus.version>0.8.0</io.prometheus.version>
<io.opentracing.version>0.33.0</io.opentracing.version>

<junit.jupiter.version>5.7.0</junit.jupiter.version>

<!--Docker container versions for integration tests-->
<rethinkdb.version>2.3.6</rethinkdb.version>
<redis.version>6-alpine</redis.version>
<org.testcontainers.version>1.15.3</org.testcontainers.version>

<!-- Docker auth credentials -->
<!--suppress UnresolvedMavenProperty -->
Expand Down Expand Up @@ -93,9 +91,7 @@
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.4.1</version>
<type>jar</type>
<scope>compile</scope>
<version>3.6.0</version>
</dependency>

<dependency>
Expand All @@ -119,14 +115,14 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
<version>30.1.1-jre</version>
</dependency>

<!-- Configuration framework -->
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.3.3</version>
<version>1.4.1</version>
</dependency>

<!-- Do all logging thru Log4j -->
Expand Down Expand Up @@ -156,6 +152,28 @@
<version>${log4j.version}</version>
</dependency>

<!-- Tracing dependencies -->
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-client</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId>
<version>${io.opentracing.version}</version>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-util</artifactId>
<version>${io.opentracing.version}</version>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-grpc</artifactId>
<version>0.2.3</version>
</dependency>

<!-- Prometheus dependencies -->
<dependency>
<groupId>io.prometheus</groupId>
Expand Down Expand Up @@ -232,13 +250,19 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.15.2</version>
<version>${org.testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.15.2</version>
<version>${org.testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-mock</artifactId>
<version>${io.opentracing.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand All @@ -256,15 +280,10 @@
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.0.0-M5</version>
</plugin>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.34.1</version>
</plugin>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>2.7.1</version>
<version>3.0.0</version>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down Expand Up @@ -325,6 +344,8 @@
<logService.host>localhost</logService.host>
<!--suppress UnresolvedMavenProperty -->
<logService.port>${logService.port}</logService.port>

<veidemann.rethinkdbadapter.version>v${veidemann.rethinkdbadapter.version}</veidemann.rethinkdbadapter.version>
</systemPropertyVariables>
</configuration>
</plugin>
Expand Down
36 changes: 15 additions & 21 deletions src/main/java/no/nb/nna/veidemann/frontier/FrontierService.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@
*/
package no.nb.nna.veidemann.frontier;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigBeanFactory;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;
import io.jaegertracing.Configuration;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.hotspot.DefaultExports;
import no.nb.nna.veidemann.commons.client.OutOfScopeHandlerClient;
import no.nb.nna.veidemann.commons.db.DbService;
import no.nb.nna.veidemann.frontier.api.FrontierApiServer;
import no.nb.nna.veidemann.frontier.settings.Settings;
import no.nb.nna.veidemann.frontier.worker.DnsServiceClient;
import no.nb.nna.veidemann.frontier.worker.Frontier;
import no.nb.nna.veidemann.frontier.worker.RobotsServiceClient;
import no.nb.nna.veidemann.frontier.worker.LogServiceClient;
import no.nb.nna.veidemann.frontier.worker.OutOfScopeHandlerClient;
import no.nb.nna.veidemann.frontier.worker.RobotsServiceClient;
import no.nb.nna.veidemann.frontier.worker.ScopeServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,11 +40,6 @@

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;

/**
* Class for launching the service.
Expand All @@ -53,19 +50,10 @@ public class FrontierService {

private static final Settings SETTINGS;

public static final ExecutorService asyncFunctionsExecutor;

static {
Config config = ConfigFactory.load();
config.checkValid(ConfigFactory.defaultReference());
SETTINGS = ConfigBeanFactory.create(config, Settings.class);

// TODO: Add tracing
// TracerFactory.init("Frontier");

asyncFunctionsExecutor = new ThreadPoolExecutor(2, 128, 15, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat("asyncFunc-%d").build(), new CallerRunsPolicy());
}

/**
Expand All @@ -81,6 +69,9 @@ public FrontierService() {
* @return this instance
*/
public FrontierService start() {
Tracer tracer = Configuration.fromEnv().getTracer();
GlobalTracer.registerIfAbsent(tracer);

DefaultExports.initialize();
try {
HTTPServer server = new HTTPServer(SETTINGS.getPrometheusPort());
Expand All @@ -90,16 +81,19 @@ public FrontierService start() {
}

JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(64);
jedisPoolConfig.setMaxTotal(256);
jedisPoolConfig.setMaxIdle(16);
jedisPoolConfig.setMinIdle(2);

try (DbService db = DbService.configure(SETTINGS);

JedisPool jedisPool = new JedisPool(jedisPoolConfig, URI.create("redis://" + SETTINGS.getRedisHost() + ':' + SETTINGS.getRedisPort()));

RobotsServiceClient robotsServiceClient = new RobotsServiceClient(
SETTINGS.getRobotsEvaluatorHost(), SETTINGS.getRobotsEvaluatorPort(), asyncFunctionsExecutor);
SETTINGS.getRobotsEvaluatorHost(), SETTINGS.getRobotsEvaluatorPort());

DnsServiceClient dnsServiceClient = new DnsServiceClient(
SETTINGS.getDnsResolverHost(), SETTINGS.getDnsResolverPort(), asyncFunctionsExecutor);
SETTINGS.getDnsResolverHost(), SETTINGS.getDnsResolverPort());

ScopeServiceClient scopeServiceClient = new ScopeServiceClient(
SETTINGS.getScopeserviceHost(), SETTINGS.getScopeservicePort());
Expand All @@ -110,7 +104,7 @@ public FrontierService start() {
LogServiceClient logServiceClient = new LogServiceClient(
SETTINGS.getLogServiceHost(), SETTINGS.getLogServicePort());

Frontier frontier = new Frontier(SETTINGS, jedisPool, robotsServiceClient, dnsServiceClient, scopeServiceClient,
Frontier frontier = new Frontier(tracer, SETTINGS, jedisPool, robotsServiceClient, dnsServiceClient, scopeServiceClient,
outOfScopeHandlerClient, logServiceClient);
) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptors;
import io.grpc.services.HealthStatusManager;
import io.opentracing.contrib.grpc.TracingServerInterceptor;
import io.opentracing.contrib.grpc.TracingServerInterceptor.ServerRequestAttribute;
import no.nb.nna.veidemann.api.frontier.v1.FrontierGrpc;
import no.nb.nna.veidemann.frontier.worker.Frontier;
import org.slf4j.Logger;
Expand Down Expand Up @@ -53,21 +55,21 @@ public FrontierApiServer(int port, int shutdownTimeoutSeconds, Frontier frontier
}

public FrontierApiServer(ServerBuilder<?> serverBuilder, Frontier frontier) {
// TODO: Add tracing
// ServerTracingInterceptor tracingInterceptor = new ServerTracingInterceptor.Builder(GlobalTracer.get())
// .withTracedAttributes(ServerTracingInterceptor.ServerRequestAttribute.CALL_ATTRIBUTES,
// ServerTracingInterceptor.ServerRequestAttribute.METHOD_TYPE)
// .build();
TracingServerInterceptor tracingInterceptor = TracingServerInterceptor
.newBuilder()
.withTracer(frontier.getTracer())
.withStreaming()
.withTracedAttributes(ServerRequestAttribute.CALL_ATTRIBUTES, ServerRequestAttribute.METHOD_TYPE)
.build();

healthCheckerExecutorService = Executors.newScheduledThreadPool(1);
health = new HealthStatusManager();
healthCheckerExecutorService.scheduleAtFixedRate(new HealthChecker(frontier, health), 0, 1, TimeUnit.SECONDS);

frontierService = new FrontierService(frontier);
server = serverBuilder
// TODO: Add tracing
// .addService(ServerInterceptors.intercept(tracingInterceptor.intercept(frontierService),
.addService(ServerInterceptors.intercept(frontierService,
.addService(ServerInterceptors.intercept(
frontierService,
ConcurrencyLimitServerInterceptor.newBuilder(
new GrpcServerLimiterBuilder()
.partitionByMethod()
Expand All @@ -81,7 +83,8 @@ public FrontierApiServer(ServerBuilder<?> serverBuilder, Frontier frontier) {
.build(Gradient2Limit.newBuilder()
.build()))
.build())
.build()))
.build(),
tracingInterceptor))
.addService(health.getHealthService())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package no.nb.nna.veidemann.frontier.api;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
Expand Down Expand Up @@ -63,28 +61,10 @@ public void awaitTermination() throws InterruptedException {

@Override
public void crawlSeed(CrawlSeedRequest request, StreamObserver<CrawlExecutionId> responseObserver) {
// TODO: Add tracing
// try (ActiveSpan span = GlobalTracer.get()
// .buildSpan("scheduleSeed")
// .asChildOf(OpenTracingContextKey.activeSpan())
// .withTag(Tags.COMPONENT.getKey(), "Frontier")
// .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
// .withTag("uri", request.getSeed().getMeta().getName())
// .startActive()) {
try {
Futures.addCallback(ctx.getFrontier().scheduleSeed(request),
new FutureCallback<CrawlExecutionStatus>() {
public void onSuccess(CrawlExecutionStatus reply) {
responseObserver.onNext(CrawlExecutionId.newBuilder().setId(reply.getId()).build());
responseObserver.onCompleted();
}

public void onFailure(Throwable t) {
LOG.error("Crawl seed error: " + t.getMessage(), t);
Status status = Status.UNKNOWN.withDescription(t.toString());
responseObserver.onError(status.asException());
}
}, no.nb.nna.veidemann.frontier.FrontierService.asyncFunctionsExecutor);
CrawlExecutionStatus reply = ctx.getFrontier().scheduleSeed(request);
responseObserver.onNext(CrawlExecutionId.newBuilder().setId(reply.getId()).build());
responseObserver.onCompleted();
} catch (Exception e) {
LOG.error("Crawl seed error: " + e.getMessage(), e);
Status status = Status.UNKNOWN.withDescription(e.toString());
Expand Down
Loading

0 comments on commit 0794c2c

Please sign in to comment.