diff --git a/pom.xml b/pom.xml
index fcd0348..49daba0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -26,7 +26,7 @@
11
${project.version}
- 1.0.0-beta21
+ 1.0.0-beta22
0.4.9
0.6.0
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/FrontierService.java b/src/main/java/no/nb/nna/veidemann/frontier/FrontierService.java
index 9f9e995..9f72ef6 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/FrontierService.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/FrontierService.java
@@ -25,7 +25,6 @@
import no.nb.nna.veidemann.commons.client.OutOfScopeHandlerClient;
import no.nb.nna.veidemann.commons.client.RobotsServiceClient;
import no.nb.nna.veidemann.commons.db.DbService;
-import no.nb.nna.veidemann.commons.opentracing.TracerFactory;
import no.nb.nna.veidemann.frontier.api.FrontierApiServer;
import no.nb.nna.veidemann.frontier.settings.Settings;
import no.nb.nna.veidemann.frontier.worker.DnsServiceClient;
@@ -60,7 +59,8 @@ public class FrontierService {
config.checkValid(ConfigFactory.defaultReference());
SETTINGS = ConfigBeanFactory.create(config, Settings.class);
- TracerFactory.init("Frontier");
+// TODO: Add tracing
+// TracerFactory.init("Frontier");
asyncFunctionsExecutor = new ThreadPoolExecutor(2, 128, 15, TimeUnit.SECONDS,
new SynchronousQueue<>(),
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/api/FrontierApiServer.java b/src/main/java/no/nb/nna/veidemann/frontier/api/FrontierApiServer.java
index 999d71e..d8cd239 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/api/FrontierApiServer.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/api/FrontierApiServer.java
@@ -23,8 +23,6 @@
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptors;
import io.grpc.services.HealthStatusManager;
-import io.opentracing.contrib.ServerTracingInterceptor;
-import io.opentracing.util.GlobalTracer;
import no.nb.nna.veidemann.api.frontier.v1.FrontierGrpc;
import no.nb.nna.veidemann.frontier.worker.Frontier;
import org.slf4j.Logger;
@@ -55,10 +53,11 @@ public FrontierApiServer(int port, int shutdownTimeoutSeconds, Frontier frontier
}
public FrontierApiServer(ServerBuilder> serverBuilder, Frontier frontier) {
- ServerTracingInterceptor tracingInterceptor = new ServerTracingInterceptor.Builder(GlobalTracer.get())
- .withTracedAttributes(ServerTracingInterceptor.ServerRequestAttribute.CALL_ATTRIBUTES,
- ServerTracingInterceptor.ServerRequestAttribute.METHOD_TYPE)
- .build();
+// TODO: Add tracing
+// ServerTracingInterceptor tracingInterceptor = new ServerTracingInterceptor.Builder(GlobalTracer.get())
+// .withTracedAttributes(ServerTracingInterceptor.ServerRequestAttribute.CALL_ATTRIBUTES,
+// ServerTracingInterceptor.ServerRequestAttribute.METHOD_TYPE)
+// .build();
healthCheckerExecutorService = Executors.newScheduledThreadPool(1);
health = new HealthStatusManager();
@@ -66,7 +65,9 @@ public FrontierApiServer(ServerBuilder> serverBuilder, Frontier frontier) {
frontierService = new FrontierService(frontier);
server = serverBuilder
- .addService(ServerInterceptors.intercept(tracingInterceptor.intercept(frontierService),
+// TODO: Add tracing
+// .addService(ServerInterceptors.intercept(tracingInterceptor.intercept(frontierService),
+ .addService(ServerInterceptors.intercept(frontierService,
ConcurrencyLimitServerInterceptor.newBuilder(
new GrpcServerLimiterBuilder()
.partitionByMethod()
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/api/FrontierService.java b/src/main/java/no/nb/nna/veidemann/frontier/api/FrontierService.java
index 4444a95..bec448d 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/api/FrontierService.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/api/FrontierService.java
@@ -21,10 +21,6 @@
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
-import io.opentracing.ActiveSpan;
-import io.opentracing.contrib.OpenTracingContextKey;
-import io.opentracing.tag.Tags;
-import io.opentracing.util.GlobalTracer;
import no.nb.nna.veidemann.api.frontier.v1.CountResponse;
import no.nb.nna.veidemann.api.frontier.v1.CrawlExecutionId;
import no.nb.nna.veidemann.api.frontier.v1.CrawlExecutionStatus;
@@ -67,13 +63,15 @@ public void awaitTermination() throws InterruptedException {
@Override
public void crawlSeed(CrawlSeedRequest request, StreamObserver responseObserver) {
- try (ActiveSpan span = GlobalTracer.get()
- .buildSpan("scheduleSeed")
- .asChildOf(OpenTracingContextKey.activeSpan())
- .withTag(Tags.COMPONENT.getKey(), "Frontier")
- .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
- .withTag("uri", request.getSeed().getMeta().getName())
- .startActive()) {
+// TODO: Add tracing
+// try (ActiveSpan span = GlobalTracer.get()
+// .buildSpan("scheduleSeed")
+// .asChildOf(OpenTracingContextKey.activeSpan())
+// .withTag(Tags.COMPONENT.getKey(), "Frontier")
+// .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
+// .withTag("uri", request.getSeed().getMeta().getName())
+// .startActive()) {
+ try {
Futures.addCallback(ctx.getFrontier().scheduleSeed(request),
new FutureCallback() {
public void onSuccess(CrawlExecutionStatus reply) {
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/api/GetNextPageHandler.java b/src/main/java/no/nb/nna/veidemann/frontier/api/GetNextPageHandler.java
index e7a60c9..60fc4f8 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/api/GetNextPageHandler.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/api/GetNextPageHandler.java
@@ -117,6 +117,7 @@ public void onNext(PageHarvest value) {
@Override
public void onError(Throwable t) {
+ LOG.warn("gRPC Error from harvester", t);
try {
try {
if (postFetchHandler == null) {
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueManager.java b/src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueManager.java
index 6ffe92f..bdb2856 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueManager.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueManager.java
@@ -219,7 +219,7 @@ private QueuedUri getNextQueuedUriToFetch() {
LOG.trace("Found Crawl Host Group ({})", chgId);
// try to find URI for CrawlHostGroup
- FutureOptional foqu = getNextFetchableQueuedUriToForCrawlHostGroup(jedisContext, chg, conn);
+ FutureOptional foqu = getNextFetchableQueuedUriForCrawlHostGroup(jedisContext, chg, conn);
if (foqu.isPresent()) {
LOG.debug("Found Queued URI: {}, crawlHostGroup: {}",
@@ -358,7 +358,7 @@ public static String uriHash(String uri) {
return Hashing.sha256().hashUnencodedChars(uri).toString();
}
- FutureOptional getNextFetchableQueuedUriToForCrawlHostGroup(JedisContext ctx, CrawlHostGroup crawlHostGroup, RethinkDbConnection conn) {
+ FutureOptional getNextFetchableQueuedUriForCrawlHostGroup(JedisContext ctx, CrawlHostGroup crawlHostGroup, RethinkDbConnection conn) {
NextUriScriptResult res = nextUriScript.run(ctx, crawlHostGroup);
if (res.future != null) {
return res.future;
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueWorker.java b/src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueWorker.java
index 9aedd48..a39070e 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueWorker.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueWorker.java
@@ -13,7 +13,6 @@
import no.nb.nna.veidemann.frontier.db.script.ChgDelayedQueueScript;
import no.nb.nna.veidemann.frontier.db.script.RedisJob.JedisContext;
import no.nb.nna.veidemann.frontier.worker.Frontier;
-import no.nb.nna.veidemann.frontier.worker.StatusWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
@@ -96,21 +95,10 @@ public void run() {
String toBeRemoved = ctx.getJedis().lpop(CRAWL_EXECUTION_TIMEOUT_KEY);
while (toBeRemoved != null) {
try {
- StatusWrapper s = StatusWrapper.getStatusWrapper(frontier, toBeRemoved);
- switch (s.getState()) {
- case SLEEPING:
- case CREATED:
- LOG.debug("CrawlExecution '{}' with state {} timed out", s.getId(), s.getState());
- s.incrementDocumentsDenied(frontier.getCrawlQueueManager()
- .deleteQueuedUrisForExecution(ctx, toBeRemoved))
- .setEndState(State.ABORTED_TIMEOUT)
- .saveStatus();
- break;
- default:
- LOG.trace("CrawlExecution '{}' with state {} was already finished", s.getId(), s.getState());
- }
+ conn.getExecutionsAdapter().setCrawlExecutionStateAborted(toBeRemoved, State.ABORTED_TIMEOUT);
} catch (Exception e) {
// Don't worry execution will be deleted at some point later
+ ctx.getJedis().rpush(CRAWL_EXECUTION_TIMEOUT_KEY, toBeRemoved);
}
toBeRemoved = ctx.getJedis().lpop(CRAWL_EXECUTION_TIMEOUT_KEY);
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/db/script/JobExecutionGetScript.java b/src/main/java/no/nb/nna/veidemann/frontier/db/script/JobExecutionGetScript.java
index 8fc68f5..610cc27 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/db/script/JobExecutionGetScript.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/db/script/JobExecutionGetScript.java
@@ -16,6 +16,9 @@ public JobExecutionStatus run(JedisContext ctx, String jobExecutionId) {
return execute(ctx, jedis -> {
String key = JOB_EXECUTION_PREFIX + jobExecutionId;
+ if (!jedis.exists(key)) {
+ return null;
+ }
JobExecutionStatusRedisMapper m = new JobExecutionStatusRedisMapper(jedis.hgetAll(key));
return m.toJobExecutionStatus(jobExecutionId);
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/worker/CrawlExecutionHelpers.java b/src/main/java/no/nb/nna/veidemann/frontier/worker/CrawlExecutionHelpers.java
index 1522550..41b4d6c 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/worker/CrawlExecutionHelpers.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/worker/CrawlExecutionHelpers.java
@@ -38,7 +38,6 @@ public class CrawlExecutionHelpers {
public static void postFetchFinally(Frontier frontier, StatusWrapper status, QueuedUriWrapper qUri, long delayMs) {
MDC.put("eid", qUri.getExecutionId());
MDC.put("uri", qUri.getUri());
-
try {
if (qUri.hasError() && qUri.getDiscoveryPath().isEmpty()) {
if (qUri.getError().getCode() == ExtraStatusCodes.PRECLUDED_BY_ROBOTS.getCode()) {
@@ -99,15 +98,12 @@ public static void endCrawl(Frontier frontier, StatusWrapper status, State state
}
public static boolean isAborted(Frontier frontier, StatusWrapper status) throws DbException {
- switch (status.getState()) {
+ switch (status.getDesiredState()) {
case ABORTED_MANUAL:
case ABORTED_TIMEOUT:
case ABORTED_SIZE:
- status.incrementDocumentsDenied(frontier.getCrawlQueueManager()
- .deleteQueuedUrisForExecution(status.getId()));
-
- // Re-set end state to ensure end time is updated
- status.setEndState(status.getState()).saveStatus();
+ // Set end state to desired state
+ status.setEndState(status.getDesiredState()).saveStatus();
return true;
}
return false;
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/worker/DnsServiceClient.java b/src/main/java/no/nb/nna/veidemann/frontier/worker/DnsServiceClient.java
index a3d963c..1137067 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/worker/DnsServiceClient.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/worker/DnsServiceClient.java
@@ -23,8 +23,6 @@
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
-import io.opentracing.contrib.ClientTracingInterceptor;
-import io.opentracing.util.GlobalTracer;
import no.nb.nna.veidemann.api.config.v1.ConfigRef;
import no.nb.nna.veidemann.api.dnsresolver.v1.DnsResolverGrpc;
import no.nb.nna.veidemann.api.dnsresolver.v1.ResolveReply;
@@ -58,8 +56,10 @@ public DnsServiceClient(final String host, final int port, ExecutorService execu
public DnsServiceClient(ManagedChannelBuilder> channelBuilder, ExecutorService executor) {
LOG.debug("Setting up DNS service client");
- ClientTracingInterceptor tracingInterceptor = new ClientTracingInterceptor.Builder(GlobalTracer.get()).build();
- channel = channelBuilder.intercept(tracingInterceptor).build();
+// TODO: Add tracing
+// ClientTracingInterceptor tracingInterceptor = new ClientTracingInterceptor.Builder(GlobalTracer.get()).build();
+// channel = channelBuilder.intercept(tracingInterceptor).build();
+ channel = channelBuilder.build();
futureStub = DnsResolverGrpc.newFutureStub(channel);
this.executor = executor;
}
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/worker/Frontier.java b/src/main/java/no/nb/nna/veidemann/frontier/worker/Frontier.java
index 264b1d4..ccb83da 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/worker/Frontier.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/worker/Frontier.java
@@ -166,7 +166,9 @@ public void preprocessAndQueueSeed(CrawlSeedRequest request, StatusWrapper statu
Futures.transformAsync(future, c -> {
switch (c) {
case DENIED:
- if (qUri.shouldInclude()) {
+ if (status.getState() == State.ABORTED_MANUAL) {
+ // Job was aborted before crawl execution was created. Ignore
+ } else if (qUri.shouldInclude()) {
// Seed was in scope, but failed for other reason
LOG.warn("Seed '{}' could not be crawled. Error: {}", qUri.getUri(), qUri.getError());
status.setEndState(State.FAILED)
@@ -227,6 +229,7 @@ public void preprocessAndQueueSeed(CrawlSeedRequest request, StatusWrapper statu
.setError(ExtraStatusCodes.ILLEGAL_URI.toFetchError(ex.toString()))
.saveStatus();
} catch (Exception ex) {
+ LOG.warn(ex.toString(), ex);
status.incrementDocumentsFailed()
.setEndState(CrawlExecutionStatus.State.FAILED)
.setError(ExtraStatusCodes.RUNTIME_EXCEPTION.toFetchError(ex.toString()))
@@ -283,6 +286,9 @@ public CrawlExecutionStatus createCrawlExecutionStatus(String jobId, String jobE
Map rMap = ProtoUtils.protoToRethink(status);
rMap.put("lastChangeTime", r.now());
rMap.put("createdTime", r.now());
+ // Set desiredState to ABORTED_MANUAL if JobExecution has desiredState ABORTED_MANUAL.
+ rMap.put("desiredState", r.table(Tables.JOB_EXECUTIONS.name).get(jobExecutionId).g("desiredState").default_("")
+ .do_(j -> r.branch(j.eq("ABORTED_MANUAL"), "ABORTED_MANUAL", "UNDEFINED")));
crawlQueueManager.updateJobExecutionStatus(jobExecutionId, State.UNDEFINED, State.CREATED, CrawlExecutionStatusChange.getDefaultInstance());
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/worker/PostFetchHandler.java b/src/main/java/no/nb/nna/veidemann/frontier/worker/PostFetchHandler.java
index 85a2ae0..117c61c 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/worker/PostFetchHandler.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/worker/PostFetchHandler.java
@@ -17,7 +17,6 @@
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
-import io.opentracing.Span;
import no.nb.nna.veidemann.api.commons.v1.Error;
import no.nb.nna.veidemann.api.config.v1.Annotation;
import no.nb.nna.veidemann.api.config.v1.ConfigObject;
@@ -58,7 +57,7 @@ public class PostFetchHandler {
private long delayMs = 0L;
private long fetchTimeMs = 0L;
- private Span span;
+// private Span span;
private AtomicBoolean done = new AtomicBoolean();
private AtomicBoolean finalized = new AtomicBoolean();
@@ -160,30 +159,35 @@ public void postFetchFinally() {
try {
calculateDelay();
} catch (DbException e) {
- e.printStackTrace();
+ LOG.error(e.toString(), e);
}
frontier.postFetchThreadPool.submit(() -> {
MDC.put("eid", qUri.getExecutionId());
MDC.put("uri", qUri.getUri());
-
- // Handle outlinks
- for (QueuedUri outlink : outlinkQueue) {
- try {
- OutlinkHandler.processOutlink(frontier, status, qUri, outlink, scriptParameters, status.getCrawlJobConfig().getCrawlJob().getScopeScriptRef());
- } catch (DbException e) {
- // An error here indicates problems with DB communication. No idea how to handle that yet.
- LOG.error("Error processing outlink: {}", e.toString(), e);
- } catch (Throwable e) {
- // Catch everything to ensure crawl host group gets released.
- // Discovering this message in logs should be investigated as a possible bug.
- LOG.error("Unknown error while processing outlink. Might be a bug", e);
+ try {
+ if (!CrawlExecutionHelpers.isAborted(frontier, status)) {
+ // Handle outlinks
+ for (QueuedUri outlink : outlinkQueue) {
+ try {
+ OutlinkHandler.processOutlink(frontier, status, qUri, outlink, scriptParameters, status.getCrawlJobConfig().getCrawlJob().getScopeScriptRef());
+ } catch (DbException e) {
+ // An error here indicates problems with DB communication. No idea how to handle that yet.
+ LOG.error("Error processing outlink: {}", e.toString(), e);
+ } catch (Throwable e) {
+ // Catch everything to ensure crawl host group gets released.
+ // Discovering this message in logs should be investigated as a possible bug.
+ LOG.error("Unknown error while processing outlink. Might be a bug", e);
+ }
+ }
}
+ } catch (DbException e) {
+ LOG.error(e.toString(), e);
}
CrawlExecutionHelpers.postFetchFinally(frontier, status, qUri, getDelay(TimeUnit.MILLISECONDS));
- span.finish();
+// span.finish();
});
}
}
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/worker/PreFetchHandler.java b/src/main/java/no/nb/nna/veidemann/frontier/worker/PreFetchHandler.java
index 5a4fd8a..e15ea6e 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/worker/PreFetchHandler.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/worker/PreFetchHandler.java
@@ -15,18 +15,13 @@
*/
package no.nb.nna.veidemann.frontier.worker;
-import io.opentracing.Span;
-import io.opentracing.tag.Tags;
-import io.opentracing.util.GlobalTracer;
import no.nb.nna.veidemann.api.config.v1.Annotation;
import no.nb.nna.veidemann.api.config.v1.ConfigObject;
import no.nb.nna.veidemann.api.config.v1.ConfigRef;
-import no.nb.nna.veidemann.api.config.v1.CrawlLimitsConfig;
import no.nb.nna.veidemann.api.config.v1.Kind;
import no.nb.nna.veidemann.api.frontier.v1.CrawlExecutionStatus.State;
import no.nb.nna.veidemann.api.frontier.v1.PageHarvestSpec;
import no.nb.nna.veidemann.api.frontier.v1.QueuedUri;
-import no.nb.nna.veidemann.commons.ExtraStatusCodes;
import no.nb.nna.veidemann.commons.db.DbException;
import no.nb.nna.veidemann.db.ProtoUtils;
import no.nb.nna.veidemann.frontier.worker.Preconditions.PreconditionState;
@@ -48,7 +43,7 @@ public class PreFetchHandler {
final Frontier frontier;
QueuedUriWrapper qUri;
- private Span span;
+// private Span span;
public PreFetchHandler(QueuedUri uri, Frontier frontier) throws DbException {
this.frontier = frontier;
@@ -62,8 +57,7 @@ public PreFetchHandler(QueuedUri uri, Frontier frontier) throws DbException {
try {
this.qUri = QueuedUriWrapper.getQueuedUriWrapperWithScopeCheck(frontier, uri, collectionConfig.getMeta().getName(),
- scriptParameters, status.getCrawlJobConfig().getCrawlJob().getScopeScriptRef())
- .clearError();
+ scriptParameters, status.getCrawlJobConfig().getCrawlJob().getScopeScriptRef()).clearError();
} catch (URISyntaxException ex) {
throw new RuntimeException(ex);
}
@@ -84,39 +78,20 @@ public boolean preFetch() throws DbException {
MDC.put("eid", qUri.getExecutionId());
MDC.put("uri", qUri.getUri());
- span = GlobalTracer.get()
- .buildSpan("runNextFetch")
- .withTag(Tags.COMPONENT.getKey(), "CrawlExecution")
- .withTag("uri", qUri.getUri())
- .withTag("executionId", status.getId())
- .ignoreActiveSpan()
- .startManual();
+// TODO: Add tracing
+// span = GlobalTracer.get()
+// .buildSpan("runNextFetch")
+// .withTag(Tags.COMPONENT.getKey(), "CrawlExecution")
+// .withTag("uri", qUri.getUri())
+// .withTag("executionId", status.getId())
+// .ignoreActiveSpan()
+// .startManual();
if (!qUri.getCrawlHostGroup().getSessionToken().isEmpty()) {
throw new IllegalStateException("Fetching in progress from another harvester");
}
- CrawlLimitsConfig limits = status.getCrawlJobConfig().getCrawlJob().getLimits();
try {
- LimitsCheck.isLimitReached(frontier, limits, status, qUri);
- if (CrawlExecutionHelpers.isAborted(frontier, status)) {
- status.removeCurrentUri(qUri);
- if (qUri.hasError() && qUri.getDiscoveryPath().isEmpty()) {
- DbUtil.writeLog(qUri);
- if (qUri.getError().getCode() == ExtraStatusCodes.PRECLUDED_BY_ROBOTS.getCode()) {
- // Seed precluded by robots.txt; mark crawl as finished
- CrawlExecutionHelpers.endCrawl(frontier, status, State.FINISHED, qUri.getError());
- } else {
- // Seed failed; mark crawl as failed
- CrawlExecutionHelpers.endCrawl(frontier, status, State.FAILED, qUri.getError());
- }
- } else if (frontier.getCrawlQueueManager().countByCrawlExecution(status.getId()) == 0) {
- // No URIs in queue; mark crawl as finished
- CrawlExecutionHelpers.endCrawl(frontier, status, State.FINISHED);
- }
- return false;
- }
-
String curCrawlHostGroupId = qUri.getCrawlHostGroupId();
PreconditionState check = Preconditions.checkPreconditions(frontier, status.getCrawlConfig(), status, qUri).get();
switch (check) {
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/worker/Preconditions.java b/src/main/java/no/nb/nna/veidemann/frontier/worker/Preconditions.java
index 7b639cd..8c55284 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/worker/Preconditions.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/worker/Preconditions.java
@@ -21,6 +21,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import no.nb.nna.veidemann.api.config.v1.ConfigObject;
+import no.nb.nna.veidemann.api.config.v1.CrawlLimitsConfig;
import no.nb.nna.veidemann.api.config.v1.PolitenessConfig.RobotsPolicy;
import no.nb.nna.veidemann.commons.ExtraStatusCodes;
import no.nb.nna.veidemann.commons.db.DbException;
@@ -40,7 +41,8 @@ public class Preconditions {
public enum PreconditionState {
OK,
DENIED,
- RETRY
+ RETRY,
+// FINISHED
}
private Preconditions() {
@@ -51,6 +53,14 @@ public static ListenableFuture checkPreconditions(Frontier fr
qUri.clearError();
+ if (CrawlExecutionHelpers.isAborted(frontier, status)) {
+ return Futures.immediateFuture(PreconditionState.DENIED);
+ }
+
+ if (isLimitReached(frontier, status, qUri)) {
+ return Futures.immediateFuture(PreconditionState.DENIED);
+ }
+
if (!qUri.shouldInclude()) {
LOG.debug("URI '{}' precluded by scope check. Reason: {}", qUri.getUri(), qUri.getExcludedReasonStatusCode());
switch (qUri.getExcludedReasonStatusCode()) {
@@ -81,6 +91,11 @@ public static ListenableFuture checkPreconditions(Frontier fr
}
+ static boolean isLimitReached(Frontier frontier, StatusWrapper status, QueuedUriWrapper qUri) throws DbException {
+ CrawlLimitsConfig limits = status.getCrawlJobConfig().getCrawlJob().getLimits();
+ return LimitsCheck.isLimitReached(frontier, limits, status, qUri);
+ }
+
static class ResolveDnsCallback implements FutureCallback {
private final Frontier frontier;
private final QueuedUriWrapper qUri;
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/worker/ScopeServiceClient.java b/src/main/java/no/nb/nna/veidemann/frontier/worker/ScopeServiceClient.java
index d2a10cc..0f7d0e2 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/worker/ScopeServiceClient.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/worker/ScopeServiceClient.java
@@ -20,8 +20,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
-import io.opentracing.contrib.ClientTracingInterceptor;
-import io.opentracing.util.GlobalTracer;
import no.nb.nna.veidemann.api.scopechecker.v1.ScopeCheckRequest;
import no.nb.nna.veidemann.api.scopechecker.v1.ScopeCheckResponse;
import no.nb.nna.veidemann.api.scopechecker.v1.ScopesCheckerServiceGrpc;
@@ -59,8 +57,10 @@ public ScopeServiceClient(final String host, final int port) {
public ScopeServiceClient(ManagedChannelBuilder> channelBuilder) {
LOG.debug("Setting up Frontier client");
- ClientTracingInterceptor tracingInterceptor = new ClientTracingInterceptor.Builder(GlobalTracer.get()).build();
- channel = channelBuilder.intercept(tracingInterceptor).build();
+// TODO: Add tracing
+// ClientTracingInterceptor tracingInterceptor = new ClientTracingInterceptor.Builder(GlobalTracer.get()).build();
+// channel = channelBuilder.intercept(tracingInterceptor).build();
+ channel = channelBuilder.build();
canonBlockingStub = UriCanonicalizerServiceGrpc.newBlockingStub(channel);
canonFutureStub = UriCanonicalizerServiceGrpc.newFutureStub(channel);
scopeBlockingStub = ScopesCheckerServiceGrpc.newBlockingStub(channel);
diff --git a/src/main/java/no/nb/nna/veidemann/frontier/worker/StatusWrapper.java b/src/main/java/no/nb/nna/veidemann/frontier/worker/StatusWrapper.java
index 3c5243d..cfd97e5 100644
--- a/src/main/java/no/nb/nna/veidemann/frontier/worker/StatusWrapper.java
+++ b/src/main/java/no/nb/nna/veidemann/frontier/worker/StatusWrapper.java
@@ -177,6 +177,7 @@ public synchronized StatusWrapper saveStatus() throws DbException {
// Check if this update was setting the end time
boolean wasNotEnded = changes.get(0).get("old_val") == null || changes.get(0).get("old_val").get("endTime") == null;
CrawlExecutionStatus newDoc = ProtoUtils.rethinkToProto(changes.get(0).get("new_val"), CrawlExecutionStatus.class);
+
frontier.getCrawlQueueManager().updateJobExecutionStatus(newDoc.getJobExecutionId(), status.getState(), newDoc.getState(), change);
if (wasNotEnded && newDoc.hasEndTime()) {
updateJobExecution(conn, newDoc.getJobExecutionId());
@@ -191,6 +192,9 @@ public synchronized StatusWrapper saveStatus() throws DbException {
private void updateJobExecution(RethinkDbConnection conn, String jobExecutionId) throws DbException {
JobExecutionStatus tjes = frontier.getCrawlQueueManager().getTempJobExecutionStatus(jobExecutionId);
+ if (tjes == null) {
+ return;
+ }
// Get a count of still running CrawlExecutions for this execution's JobExecution
Long notEndedCount = tjes.getExecutionsStateMap().entrySet().stream()
@@ -219,7 +223,11 @@ private void updateJobExecution(RethinkDbConnection conn, String jobExecutionId)
state = jes.getState();
break;
default:
- state = JobExecutionStatus.State.FINISHED;
+ if (jes.getDesiredState() != null && jes.getDesiredState() != JobExecutionStatus.State.UNDEFINED) {
+ state = jes.getDesiredState();
+ } else {
+ state = JobExecutionStatus.State.FINISHED;
+ }
break;
}
@@ -277,6 +285,10 @@ public CrawlExecutionStatus.State getState() {
return getCrawlExecutionStatus().getState();
}
+ public CrawlExecutionStatus.State getDesiredState() {
+ return getCrawlExecutionStatus().getDesiredState();
+ }
+
public StatusWrapper setState(CrawlExecutionStatus.State state) {
dirty = true;
getChange().setState(state);
diff --git a/src/main/resources/lua/jobexecution_update.lua b/src/main/resources/lua/jobexecution_update.lua
index b8a96b5..f444493 100644
--- a/src/main/resources/lua/jobexecution_update.lua
+++ b/src/main/resources/lua/jobexecution_update.lua
@@ -11,20 +11,24 @@
--- ARGV[9]: bytesCrawled
---
--- Update states
-if ARGV[1] ~= 'UNDEFINED' then
- redis.call('HINCRBY', KEYS[1], ARGV[1], -1)
-end
+if ARGV[2] == 'CREATED' or redis.call('EXISTS', KEYS[1]) == 1 then
-if ARGV[2] ~= 'UNDEFINED' then
- redis.call('HINCRBY', KEYS[1], ARGV[2], 1)
-end
+ -- Update states
+ if ARGV[1] ~= 'UNDEFINED' then
+ redis.call('HINCRBY', KEYS[1], ARGV[1], -1)
+ end
--- Update stats
-redis.call('HINCRBY', KEYS[1], "documentsCrawled", ARGV[3])
-redis.call('HINCRBY', KEYS[1], "documentsDenied", ARGV[4])
-redis.call('HINCRBY', KEYS[1], "documentsFailed", ARGV[5])
-redis.call('HINCRBY', KEYS[1], "documentsOutOfScope", ARGV[6])
-redis.call('HINCRBY', KEYS[1], "documentsRetried", ARGV[7])
-redis.call('HINCRBY', KEYS[1], "urisCrawled", ARGV[8])
-redis.call('HINCRBY', KEYS[1], "bytesCrawled", ARGV[9])
+ if ARGV[2] ~= 'UNDEFINED' then
+ redis.call('HINCRBY', KEYS[1], ARGV[2], 1)
+ end
+
+ -- Update stats
+ redis.call('HINCRBY', KEYS[1], "documentsCrawled", ARGV[3])
+ redis.call('HINCRBY', KEYS[1], "documentsDenied", ARGV[4])
+ redis.call('HINCRBY', KEYS[1], "documentsFailed", ARGV[5])
+ redis.call('HINCRBY', KEYS[1], "documentsOutOfScope", ARGV[6])
+ redis.call('HINCRBY', KEYS[1], "documentsRetried", ARGV[7])
+ redis.call('HINCRBY', KEYS[1], "urisCrawled", ARGV[8])
+ redis.call('HINCRBY', KEYS[1], "bytesCrawled", ARGV[9])
+
+end
\ No newline at end of file
diff --git a/src/test/java/no/nb/nna/veidemann/frontier/api/FrontierClientMock.java b/src/test/java/no/nb/nna/veidemann/frontier/api/FrontierClientMock.java
index 8b0d557..e61ecf7 100644
--- a/src/test/java/no/nb/nna/veidemann/frontier/api/FrontierClientMock.java
+++ b/src/test/java/no/nb/nna/veidemann/frontier/api/FrontierClientMock.java
@@ -21,8 +21,6 @@
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
-import io.opentracing.contrib.ClientTracingInterceptor;
-import io.opentracing.util.GlobalTracer;
import no.nb.nna.veidemann.api.commons.v1.Error;
import no.nb.nna.veidemann.api.frontier.v1.FrontierGrpc;
import no.nb.nna.veidemann.api.frontier.v1.PageHarvest;
@@ -48,8 +46,7 @@ public class FrontierClientMock implements AutoCloseable {
* Construct client for accessing Frontier using the existing channel.
*/
public FrontierClientMock(ManagedChannelBuilder> channelBuilder) {
- ClientTracingInterceptor tracingInterceptor = new ClientTracingInterceptor.Builder(GlobalTracer.get()).build();
- channel = channelBuilder.intercept(tracingInterceptor).build();
+ channel = channelBuilder.build();
asyncStub = FrontierGrpc.newStub(channel).withWaitForReady();
}
diff --git a/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestTest.java b/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestTest.java
index 2e86838..c70ec56 100644
--- a/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestTest.java
+++ b/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestTest.java
@@ -3,6 +3,8 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
+import no.nb.nna.veidemann.api.config.v1.CrawlLimitsConfig;
+import no.nb.nna.veidemann.api.frontier.v1.CrawlExecutionStatus;
import no.nb.nna.veidemann.api.frontier.v1.FrontierGrpc;
import no.nb.nna.veidemann.api.frontier.v1.JobExecutionStatus;
import no.nb.nna.veidemann.api.frontier.v1.JobExecutionStatus.State;
@@ -15,6 +17,7 @@
import no.nb.nna.veidemann.db.RethinkDbConnection;
import no.nb.nna.veidemann.db.Tables;
import no.nb.nna.veidemann.db.initializer.RethinkDbInitializer;
+import no.nb.nna.veidemann.frontier.db.CrawlQueueManager;
import no.nb.nna.veidemann.frontier.settings.Settings;
import no.nb.nna.veidemann.frontier.testutil.DnsResolverMock;
import no.nb.nna.veidemann.frontier.testutil.HarvesterMock;
@@ -41,12 +44,14 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
+import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -55,6 +60,7 @@
import static com.rethinkdb.RethinkDB.r;
import static no.nb.nna.veidemann.frontier.testutil.FrontierAssertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
@Testcontainers
@@ -112,24 +118,34 @@ public class HarvestTest {
new OneShotStartupCheckStrategy().withTimeout(Duration.ofSeconds(60))
);
+ private String getStringProperty(String name, String def) {
+ String prop = System.getProperty(name);
+ return prop.isBlank() ? def : prop;
+ }
+
+ private int getIntProperty(String name, int def) {
+ String prop = System.getProperty(name);
+ return prop.isBlank() ? def : Integer.parseInt(prop);
+ }
+
@BeforeEach
public void setup() throws DbQueryException, DbConnectionException, IOException {
Settings settings = new Settings();
- settings.setDnsResolverHost(System.getProperty("dnsresolver.host", "localhost"));
- settings.setDnsResolverPort(Integer.parseInt(System.getProperty("dnsresolver.port", "9500")));
- settings.setRobotsEvaluatorHost(System.getProperty("robotsevaluator.host", "localhost"));
- settings.setRobotsEvaluatorPort(Integer.parseInt(System.getProperty("robotsevaluator.port", "9501")));
- settings.setOutOfScopeHandlerHost(System.getProperty("ooshandler.host", "localhost"));
- settings.setOutOfScopeHandlerPort(Integer.parseInt(System.getProperty("ooshandler.port", "9502")));
- settings.setScopeserviceHost(System.getProperty("scopeChecker.host", "localhost"));
- settings.setScopeservicePort(Integer.parseInt(System.getProperty("scopeChecker.port", "9503")));
+ settings.setDnsResolverHost(getStringProperty("dnsresolver.host", "localhost"));
+ settings.setDnsResolverPort(getIntProperty("dnsresolver.port", 9500));
+ settings.setRobotsEvaluatorHost(getStringProperty("robotsevaluator.host", "localhost"));
+ settings.setRobotsEvaluatorPort(getIntProperty("robotsevaluator.port", 9501));
+ settings.setOutOfScopeHandlerHost(getStringProperty("ooshandler.host", "localhost"));
+ settings.setOutOfScopeHandlerPort(getIntProperty("ooshandler.port", 9502));
+ settings.setScopeserviceHost(getStringProperty("scopeChecker.host", "localhost"));
+ settings.setScopeservicePort(getIntProperty("scopeChecker.port", 9503));
settings.setDbHost(rethinkDb.getHost());
settings.setDbPort(rethinkDb.getFirstMappedPort());
settings.setDbName("veidemann");
settings.setDbUser("admin");
settings.setDbPassword("");
settings.setBusyTimeout(Duration.ofSeconds(2));
- settings.setApiPort(Integer.parseInt(System.getProperty("frontier.port", "9504")));
+ settings.setApiPort(getIntProperty("frontier.port", 9504));
settings.setTerminationGracePeriodSeconds(10);
settings.setRedisHost(redis.getHost());
settings.setRedisPort(redis.getFirstMappedPort());
@@ -505,4 +521,286 @@ public void testDnsFailureThreeTimes() throws Exception {
assertThat(redisData)
.readyQueue().hasNumberOfElements(0);
}
+
+ @Test
+ public void testDnsExceptionThreeTimes() throws Exception {
+ int seedCount = 1;
+ int linksPerLevel = 0;
+ int maxHopsFromSeed = 1;
+
+ scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed);
+ harvesterMock.withLinksPerLevel(linksPerLevel);
+ dnsResolverMock.withExceptionForAllHostRequests("stress-000000.com");
+
+ SetupCrawl c = new SetupCrawl();
+ c.setup(seedCount);
+
+ Instant testStart = Instant.now();
+
+ JobExecutionStatus jes = c.runCrawl(frontierStub);
+
+
+ await().pollDelay(1, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).atMost(20, TimeUnit.SECONDS)
+ .until(() -> {
+ JobExecutionStatus j = DbService.getInstance().getExecutionsAdapter().getJobExecutionStatus(jes.getId());
+ if (j.getExecutionsStateCount() > 0) {
+ System.out.println("STATE " + j.getExecutionsStateMap() + " :: " + j.getState());
+ System.out.println(jedisPool.getResource().keys("*") + " :: QCT=" + jedisPool.getResource().get("QCT"));
+ }
+ if (State.FINISHED == j.getState() && rethinkDbData.getQueuedUris().isEmpty()) {
+ return true;
+ }
+ return false;
+ });
+
+ Duration testTime = Duration.between(testStart, Instant.now());
+ System.out.println(String.format("Test time: %02d:%02d:%02d.%d",
+ testTime.toHoursPart(), testTime.toMinutesPart(), testTime.toSecondsPart(), testTime.toMillisPart()));
+
+ rethinkDbData.getCrawlLogs().forEach(cl -> System.out.println(String.format("Status: %3d %s %s", cl.getStatusCode(), cl.getRequestedUri(), cl.getError())));
+ assertThat(rethinkDbData)
+ .hasQueueTotalCount(0)
+ .crawlLogs().hasNumberOfElements(1);
+
+ assertThat(redisData)
+ .hasQueueTotalCount(0)
+ .crawlHostGroups().hasNumberOfElements(0);
+ assertThat(redisData)
+ .crawlExecutionQueueCounts().hasNumberOfElements(0);
+ assertThat(redisData)
+ .sessionTokens().hasNumberOfElements(0);
+ assertThat(redisData)
+ .readyQueue().hasNumberOfElements(0);
+ }
+
+ @Test
+ public void testAbortCrawlExecution() throws Exception {
+ int seedCount = 1;
+ int linksPerLevel = 3;
+ int maxHopsFromSeed = 2;
+
+ scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed);
+ harvesterMock.withLinksPerLevel(linksPerLevel);
+
+ SetupCrawl c = new SetupCrawl();
+ c.setup(seedCount);
+
+ Instant testStart = Instant.now();
+
+ JobExecutionStatus jes = c.runCrawl(frontierStub);
+
+ // Abort the first execution as soon as it is created
+ String crawlExecutionId = c.crawlExecutions.get(c.seeds.get(0).getId()).get().getId();
+ DbService.getInstance().getExecutionsAdapter().setCrawlExecutionStateAborted(crawlExecutionId, CrawlExecutionStatus.State.ABORTED_MANUAL);
+
+ await().pollDelay(1, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).atMost(30, TimeUnit.SECONDS)
+ .until(() -> {
+ JobExecutionStatus j = DbService.getInstance().getExecutionsAdapter().getJobExecutionStatus(jes.getId());
+ if (j.getExecutionsStateCount() > 0) {
+ System.out.println("STATE " + j.getExecutionsStateMap() + " :: " + j.getState());
+ System.out.println(jedisPool.getResource().keys("*") + " :: QCT=" + jedisPool.getResource().get("QCT"));
+ }
+ if (State.RUNNING != j.getState() && rethinkDbData.getQueuedUris().isEmpty() && jedisPool.getResource().keys("*").size() <= 1) {
+ return true;
+ }
+ return false;
+ });
+
+ Duration testTime = Duration.between(testStart, Instant.now());
+ System.out.println(String.format("Test time: %02d:%02d:%02d.%d",
+ testTime.toHoursPart(), testTime.toMinutesPart(), testTime.toSecondsPart(), testTime.toMillisPart()));
+
+ assertThat(rethinkDbData)
+ .hasQueueTotalCount(0)
+ .crawlLogs().hasNumberOfElements(0);
+ assertThat(rethinkDbData)
+ .crawlExecutionStatuses().hasNumberOfElements(1)
+ .elementById(crawlExecutionId)
+ .hasState(CrawlExecutionStatus.State.ABORTED_MANUAL)
+ .hasStartTime(true)
+ .hasEndTime(true)
+ .hasStats(0, 0, 0, 0, 0)
+ .currentUriIdCountIsEqualTo(0);
+ assertThat(rethinkDbData)
+ .jobExecutionStatuses().hasNumberOfElements(1)
+ .elementById(jes.getId())
+ .hasState(JobExecutionStatus.State.FINISHED)
+ .hasStartTime(true)
+ .hasEndTime(true)
+ .hasStats(0, 0, 0, 0, 0);
+
+ assertThat(redisData)
+ .hasQueueTotalCount(0)
+ .crawlHostGroups().hasNumberOfElements(0);
+ assertThat(redisData)
+ .crawlExecutionQueueCounts().hasNumberOfElements(0);
+ assertThat(redisData)
+ .sessionTokens().hasNumberOfElements(0);
+ assertThat(redisData)
+ .readyQueue().hasNumberOfElements(0);
+ }
+
+ @Test
+ public void testAbortJobExecution() throws Exception {
+ int seedCount = 20;
+ int linksPerLevel = 3;
+ int maxHopsFromSeed = 2;
+
+ scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed);
+ harvesterMock.withLinksPerLevel(linksPerLevel);
+ dnsResolverMock.withSimulatedLookupTimeMs(300);
+
+ SetupCrawl c = new SetupCrawl();
+ c.setup(seedCount);
+
+ Instant testStart = Instant.now();
+
+ JobExecutionStatus jes = c.runCrawl(frontierStub);
+
+ // Abort the first execution as soon as one seed is completed
+ await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS)
+ .until(() -> {
+ try (Jedis jedis = jedisPool.getResource()) {
+ Map f = jedis.hgetAll(CrawlQueueManager.JOB_EXECUTION_PREFIX + jes.getId());
+ if (!f.getOrDefault("FINISHED", "0").equals("0")) {
+ return true;
+ }
+ return false;
+ }
+ });
+ DbService.getInstance().getExecutionsAdapter().setJobExecutionStateAborted(jes.getId());
+
+ await().pollDelay(1, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).atMost(30, TimeUnit.SECONDS)
+ .until(() -> {
+ try (Jedis jedis = jedisPool.getResource()) {
+ JobExecutionStatus j = DbService.getInstance().getExecutionsAdapter().getJobExecutionStatus(jes.getId());
+ if (j.getExecutionsStateCount() > 0) {
+ System.out.println("STATE " + j.getExecutionsStateMap() + " :: " + j.getState());
+ System.out.println(jedisPool.getResource().keys("*") + " :: QCT=" + jedisPool.getResource().get("QCT"));
+ }
+ if (State.RUNNING != j.getState() && rethinkDbData.getQueuedUris().isEmpty() && jedis.keys("*").size() <= 1) {
+ return true;
+ }
+ return false;
+ }
+ });
+
+ Duration testTime = Duration.between(testStart, Instant.now());
+ System.out.println(String.format("Test time: %02d:%02d:%02d.%d",
+ testTime.toHoursPart(), testTime.toMinutesPart(), testTime.toSecondsPart(), testTime.toMillisPart()));
+
+ assertThat(rethinkDbData)
+ .hasQueueTotalCount(0)
+ .crawlLogs().hasNumberOfElements(0);
+ assertThat(rethinkDbData)
+ .crawlExecutionStatuses().hasNumberOfElements(20);
+ assertThat(rethinkDbData)
+ .jobExecutionStatuses().hasNumberOfElements(1)
+ .elementById(jes.getId())
+ .hasState(JobExecutionStatus.State.ABORTED_MANUAL)
+ .hasStartTime(true)
+ .hasEndTime(true)
+ .satisfies(j -> {
+ assertThat(j.getDocumentsCrawled()).isGreaterThan(0);
+ assertThat(j.getDocumentsDenied()).isEqualTo(0);
+ assertThat(j.getDocumentsFailed()).isEqualTo(0);
+ assertThat(j.getDocumentsRetried()).isEqualTo(0);
+ assertThat(j.getDocumentsOutOfScope()).isGreaterThan(0);
+ assertThat(j.getExecutionsStateMap()).satisfies(s -> {
+ assertThat(s.get(CrawlExecutionStatus.State.ABORTED_MANUAL.name())).isGreaterThan(0);
+ assertThat(s.get(CrawlExecutionStatus.State.ABORTED_TIMEOUT.name())).isEqualTo(0);
+ assertThat(s.get(CrawlExecutionStatus.State.ABORTED_SIZE.name())).isEqualTo(0);
+ assertThat(s.get(CrawlExecutionStatus.State.FINISHED.name())).isGreaterThan(0);
+ assertThat(s.get(CrawlExecutionStatus.State.FAILED.name())).isEqualTo(0);
+ assertThat(s.get(CrawlExecutionStatus.State.CREATED.name())).isEqualTo(0);
+ assertThat(s.get(CrawlExecutionStatus.State.FETCHING.name())).isEqualTo(0);
+ assertThat(s.get(CrawlExecutionStatus.State.SLEEPING.name())).isEqualTo(0);
+ });
+ });
+
+ assertThat(redisData)
+ .hasQueueTotalCount(0)
+ .crawlHostGroups().hasNumberOfElements(0);
+ assertThat(redisData)
+ .crawlExecutionQueueCounts().hasNumberOfElements(0);
+ assertThat(redisData)
+ .sessionTokens().hasNumberOfElements(0);
+ assertThat(redisData)
+ .readyQueue().hasNumberOfElements(0);
+ }
+
+ @Test
+ public void testAbortTimeout() throws Exception {
+ int seedCount = 20;
+ int linksPerLevel = 3;
+ int maxHopsFromSeed = 2;
+
+ scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed);
+ harvesterMock.withLinksPerLevel(linksPerLevel);
+ dnsResolverMock.withSimulatedLookupTimeMs(300);
+
+ SetupCrawl c = new SetupCrawl();
+ c.setup(seedCount, CrawlLimitsConfig.newBuilder().setMaxDurationS(5).build());
+
+ Instant testStart = Instant.now();
+
+ JobExecutionStatus jes = c.runCrawl(frontierStub);
+
+ await().pollDelay(1, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).atMost(30, TimeUnit.SECONDS)
+ .until(() -> {
+ try (Jedis jedis = jedisPool.getResource()) {
+ JobExecutionStatus j = DbService.getInstance().getExecutionsAdapter().getJobExecutionStatus(jes.getId());
+ if (j.getExecutionsStateCount() > 0) {
+ System.out.println("STATE " + j.getExecutionsStateMap() + " :: " + j.getState());
+ }
+ if (State.RUNNING != j.getState() && rethinkDbData.getQueuedUris().isEmpty() && jedis.keys("*").size() <= 1) {
+ return true;
+ }
+ return false;
+ }
+ });
+
+ Duration testTime = Duration.between(testStart, Instant.now());
+ System.out.println(String.format("Test time: %02d:%02d:%02d.%d",
+ testTime.toHoursPart(), testTime.toMinutesPart(), testTime.toSecondsPart(), testTime.toMillisPart()));
+
+ assertThat(rethinkDbData)
+ .hasQueueTotalCount(0)
+ .crawlLogs().hasNumberOfElements(0);
+ assertThat(rethinkDbData)
+ .crawlExecutionStatuses().hasNumberOfElements(20);
+ assertThat(rethinkDbData)
+ .jobExecutionStatuses().hasNumberOfElements(1)
+ .elementById(jes.getId())
+ .hasState(JobExecutionStatus.State.FINISHED)
+ .hasStartTime(true)
+ .hasEndTime(true)
+ .satisfies(j -> {
+ assertThat(j.getDocumentsCrawled()).isGreaterThan(0);
+ assertThat(j.getDocumentsDenied()).isEqualTo(0);
+ assertThat(j.getDocumentsFailed()).isEqualTo(0);
+ assertThat(j.getDocumentsRetried()).isEqualTo(0);
+ assertThat(j.getDocumentsOutOfScope()).isGreaterThan(0);
+ assertThat(j.getExecutionsStateMap()).satisfies(s -> {
+ assertThat(s.get(CrawlExecutionStatus.State.ABORTED_MANUAL.name())).isEqualTo(0);
+ assertThat(s.get(CrawlExecutionStatus.State.ABORTED_TIMEOUT.name())).isGreaterThan(0);
+ assertThat(s.get(CrawlExecutionStatus.State.ABORTED_SIZE.name())).isEqualTo(0);
+ assertThat(s.get(CrawlExecutionStatus.State.FINISHED.name())).isGreaterThanOrEqualTo(0);
+ assertThat(s.get(CrawlExecutionStatus.State.FAILED.name())).isEqualTo(0);
+ assertThat(s.get(CrawlExecutionStatus.State.CREATED.name())).isEqualTo(0);
+ assertThat(s.get(CrawlExecutionStatus.State.FETCHING.name())).isEqualTo(0);
+ assertThat(s.get(CrawlExecutionStatus.State.SLEEPING.name())).isEqualTo(0);
+ });
+ });
+
+ assertThat(redisData)
+ .hasQueueTotalCount(0)
+ .crawlHostGroups().hasNumberOfElements(0);
+ assertThat(redisData)
+ .crawlExecutionQueueCounts().hasNumberOfElements(0);
+ assertThat(redisData)
+ .sessionTokens().hasNumberOfElements(0);
+ assertThat(redisData)
+ .readyQueue().hasNumberOfElements(0);
+ }
}
\ No newline at end of file
diff --git a/src/test/java/no/nb/nna/veidemann/frontier/testutil/CrawlExecutionStatusAssert.java b/src/test/java/no/nb/nna/veidemann/frontier/testutil/CrawlExecutionStatusAssert.java
new file mode 100644
index 0000000..3325506
--- /dev/null
+++ b/src/test/java/no/nb/nna/veidemann/frontier/testutil/CrawlExecutionStatusAssert.java
@@ -0,0 +1,100 @@
+package no.nb.nna.veidemann.frontier.testutil;
+
+import no.nb.nna.veidemann.api.frontier.v1.CrawlExecutionStatus;
+import no.nb.nna.veidemann.api.frontier.v1.CrawlExecutionStatus.State;
+
+import java.util.Objects;
+
+public class CrawlExecutionStatusAssert extends IdMappedAssert.FromMapAssert {
+
+ public CrawlExecutionStatusAssert(IdMappedAssert origin, CrawlExecutionStatus actual) {
+ super(origin, actual);
+ }
+
+ public CrawlExecutionStatusAssert hasState(State expected) {
+ isNotNull();
+ if (!Objects.equals(actual.getState(), expected)) {
+ failWithMessage("Expected state to be <%s> but was <%s>", expected, actual.getState());
+ }
+
+ return this;
+ }
+
+ public CrawlExecutionStatusAssert hasStats(long documentsCrawled, long documentsDenied, long documentsFailed,
+ long documentsRetried, long documentsOutOfScope) {
+ isNotNull();
+ if (actual.getDocumentsCrawled() != documentsCrawled) {
+ failWithMessage("Expected documentsCrawled to be <%d> but was <%d>", documentsCrawled, actual.getDocumentsCrawled());
+ }
+ if (actual.getDocumentsDenied() != documentsDenied) {
+ failWithMessage("Expected documentsDenied to be <%d> but was <%d>", documentsDenied, actual.getDocumentsDenied());
+ }
+ if (actual.getDocumentsFailed() != documentsFailed) {
+ failWithMessage("Expected documentsFailed to be <%d> but was <%d>", documentsFailed, actual.getDocumentsFailed());
+ }
+ if (actual.getDocumentsRetried() != documentsRetried) {
+ failWithMessage("Expected documentsRetried to be <%d> but was <%d>", documentsRetried, actual.getDocumentsRetried());
+ }
+ if (actual.getDocumentsOutOfScope() != documentsOutOfScope) {
+ failWithMessage("Expected documentsOutOfScope to be <%d> but was <%d>", documentsOutOfScope, actual.getDocumentsOutOfScope());
+ }
+
+ return this;
+ }
+
+ public CrawlExecutionStatusAssert hasCreatedTime(boolean expected) {
+ isNotNull();
+ if (actual.hasCreatedTime() != expected) {
+ failWithMessage("Expected hasCreatedTime to be <%b> but was <%b>", expected, actual.getCreatedTime());
+ }
+
+ return this;
+ }
+
+ public CrawlExecutionStatusAssert hasStartTime(boolean expected) {
+ isNotNull();
+ if (actual.hasStartTime() != expected) {
+ failWithMessage("Expected hasStartTime to be <%b> but was <%b>", expected, actual.hasStartTime());
+ }
+
+ return this;
+ }
+
+ public CrawlExecutionStatusAssert hasEndTime(boolean expected) {
+ isNotNull();
+ if (actual.hasEndTime() != expected) {
+ failWithMessage("Expected hasEndTime to be <%b> but was <%b>", expected, actual.hasEndTime());
+ }
+
+ return this;
+ }
+
+ public CrawlExecutionStatusAssert currentUriIdCountIsEqualTo(int expected) {
+ isNotNull();
+ if (actual.getCurrentUriIdCount() != expected) {
+ failWithMessage("Expected currentUriIdCount to be <%s> but was <%s>", expected, actual.getCurrentUriIdCount());
+ }
+
+ return this;
+ }
+
+// public CrawlExecutionStatusAssert next() {
+// return new CrawlExecutionStatusAssert(statusIterator);
+// }
+
+// public CrawlExecutionStatusAssert hasCrawlExecutionStatuses(EssentialQueuedUriFields... expected) {
+// isNotNull();
+//
+// List actualEessentialQueuedUriFields = actual.queuedUriList.stream()
+// .map(qUri -> new EssentialQueuedUriFields(qUri.getUri(), qUri.getSeedUri(), qUri.getIp(), !qUri.getCrawlHostGroupId().isBlank()))
+// .collect(Collectors.toList());
+//
+// for (EssentialQueuedUriFields e : expected) {
+// if (!actualEessentialQueuedUriFields.contains(e)) {
+// failWithMessage("Expected Queued Uri's to contain <%s> but did not", e);
+// }
+// }
+//
+// return this;
+// }
+}
diff --git a/src/test/java/no/nb/nna/veidemann/frontier/testutil/DnsResolverMock.java b/src/test/java/no/nb/nna/veidemann/frontier/testutil/DnsResolverMock.java
index 1abb387..84b55e7 100644
--- a/src/test/java/no/nb/nna/veidemann/frontier/testutil/DnsResolverMock.java
+++ b/src/test/java/no/nb/nna/veidemann/frontier/testutil/DnsResolverMock.java
@@ -38,6 +38,8 @@ public class DnsResolverMock implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DnsResolverMock.class);
Pattern seedNumPattern = Pattern.compile("stress-(\\d\\d)(\\d\\d)(\\d\\d).com");
+ long simulatedLookupTimeMs = 0L;
+
public final RequestLog requestLog = new RequestLog();
private final RequestMatcher exceptionForHost = new RequestMatcher(requestLog);
private final RequestMatcher fetchErrorForHost = new RequestMatcher(requestLog);
@@ -79,6 +81,11 @@ public DnsResolverMock withFetchErrorForHostRequests(String host, int from, int
return this;
}
+ public DnsResolverMock withSimulatedLookupTimeMs(long time) {
+ this.simulatedLookupTimeMs = time;
+ return this;
+ }
+
public class DnsService extends DnsResolverGrpc.DnsResolverImplBase {
@Override
public void resolve(ResolveRequest request, StreamObserver responseObserver) {
@@ -95,6 +102,12 @@ public void resolve(ResolveRequest request, StreamObserver respons
return;
}
+ try {
+ Thread.sleep(simulatedLookupTimeMs);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
Matcher m = seedNumPattern.matcher(request.getHost());
if (!m.matches()) {
System.out.println("Regex error");
diff --git a/src/test/java/no/nb/nna/veidemann/frontier/testutil/IdMappedAssert.java b/src/test/java/no/nb/nna/veidemann/frontier/testutil/IdMappedAssert.java
new file mode 100644
index 0000000..3d58560
--- /dev/null
+++ b/src/test/java/no/nb/nna/veidemann/frontier/testutil/IdMappedAssert.java
@@ -0,0 +1,48 @@
+package no.nb.nna.veidemann.frontier.testutil;
+
+import no.nb.nna.veidemann.frontier.testutil.IdMappedAssert.FromMapAssert;
+import org.assertj.core.api.AbstractAssert;
+
+import java.util.Map;
+
+public class IdMappedAssert, MAPPED>
+ extends AbstractAssert, Map> {
+
+ protected final Class mappedType;
+ protected final Class assertType;
+
+ public IdMappedAssert(Map actual, Class assertType, Class mappedType) {
+ super(actual, IdMappedAssert.class);
+ this.mappedType = mappedType;
+ this.assertType = assertType;
+ }
+
+ public IdMappedAssert hasNumberOfElements(int expected) {
+ if (actual.size() != expected) {
+ failWithMessage("Expected number of CrawlHostGroups to be <%d>, but was <%d>",
+ expected, actual.size());
+ }
+ return this;
+ }
+
+ public ASSERT elementById(String id) {
+ try {
+ return (ASSERT) assertType.getDeclaredConstructor(IdMappedAssert.class, mappedType).newInstance(this, actual.get(id));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class FromMapAssert, MAPPED> extends AbstractAssert {
+ final IdMappedAssert origin;
+
+ public FromMapAssert(IdMappedAssert origin, MAPPED actual) {
+ super(actual, FromMapAssert.class);
+ this.origin = origin;
+ }
+
+ public SELF elementById(String id) {
+ return (SELF) origin.elementById(id);
+ }
+ }
+}
diff --git a/src/test/java/no/nb/nna/veidemann/frontier/testutil/JobExecutionStatusAssert.java b/src/test/java/no/nb/nna/veidemann/frontier/testutil/JobExecutionStatusAssert.java
new file mode 100644
index 0000000..bed04d4
--- /dev/null
+++ b/src/test/java/no/nb/nna/veidemann/frontier/testutil/JobExecutionStatusAssert.java
@@ -0,0 +1,62 @@
+package no.nb.nna.veidemann.frontier.testutil;
+
+import no.nb.nna.veidemann.api.frontier.v1.JobExecutionStatus;
+import no.nb.nna.veidemann.api.frontier.v1.JobExecutionStatus.State;
+
+import java.util.Objects;
+
+public class JobExecutionStatusAssert extends IdMappedAssert.FromMapAssert {
+
+ public JobExecutionStatusAssert(IdMappedAssert origin, JobExecutionStatus actual) {
+ super(origin, actual);
+ }
+
+ public JobExecutionStatusAssert hasState(State expected) {
+ isNotNull();
+ if (!Objects.equals(actual.getState(), expected)) {
+ failWithMessage("Expected state to be <%s> but was <%s>", expected, actual.getState());
+ }
+
+ return this;
+ }
+
+ public JobExecutionStatusAssert hasStats(long documentsCrawled, long documentsDenied, long documentsFailed,
+ long documentsRetried, long documentsOutOfScope) {
+ isNotNull();
+ if (actual.getDocumentsCrawled() != documentsCrawled) {
+ failWithMessage("Expected documentsCrawled to be <%d> but was <%d>", documentsCrawled, actual.getDocumentsCrawled());
+ }
+ if (actual.getDocumentsDenied() != documentsDenied) {
+ failWithMessage("Expected documentsDenied to be <%d> but was <%d>", documentsDenied, actual.getDocumentsDenied());
+ }
+ if (actual.getDocumentsFailed() != documentsFailed) {
+ failWithMessage("Expected documentsFailed to be <%d> but was <%d>", documentsFailed, actual.getDocumentsFailed());
+ }
+ if (actual.getDocumentsRetried() != documentsRetried) {
+ failWithMessage("Expected documentsRetried to be <%d> but was <%d>", documentsRetried, actual.getDocumentsRetried());
+ }
+ if (actual.getDocumentsOutOfScope() != documentsOutOfScope) {
+ failWithMessage("Expected documentsOutOfScope to be <%d> but was <%d>", documentsOutOfScope, actual.getDocumentsOutOfScope());
+ }
+
+ return this;
+ }
+
+ public JobExecutionStatusAssert hasStartTime(boolean expected) {
+ isNotNull();
+ if (actual.hasStartTime() != expected) {
+ failWithMessage("Expected hasStartTime to be <%b> but was <%b>", expected, actual.hasStartTime());
+ }
+
+ return this;
+ }
+
+ public JobExecutionStatusAssert hasEndTime(boolean expected) {
+ isNotNull();
+ if (actual.hasEndTime() != expected) {
+ failWithMessage("Expected hasEndTime to be <%b> but was <%b>", expected, actual.hasEndTime());
+ }
+
+ return this;
+ }
+}
diff --git a/src/test/java/no/nb/nna/veidemann/frontier/testutil/RethinkDbData.java b/src/test/java/no/nb/nna/veidemann/frontier/testutil/RethinkDbData.java
index 7112998..a602f03 100644
--- a/src/test/java/no/nb/nna/veidemann/frontier/testutil/RethinkDbData.java
+++ b/src/test/java/no/nb/nna/veidemann/frontier/testutil/RethinkDbData.java
@@ -34,19 +34,19 @@ public List getQueuedUris() throws DbQueryException, DbConnectionExce
}
}
- public List getCrawlExecutionStatuses() throws DbQueryException, DbConnectionException {
+ public Map getCrawlExecutionStatuses() throws DbQueryException, DbConnectionException {
try (Cursor