Skip to content

Commit

Permalink
Merge pull request #40 from nlnwa/politeness
Browse files Browse the repository at this point in the history
Abort crawl
  • Loading branch information
Langvann authored Apr 7, 2021
2 parents 6a32ace + d9d7b61 commit 8105a7c
Show file tree
Hide file tree
Showing 26 changed files with 689 additions and 163 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<maven.compiler.target>11</maven.compiler.target>
<docker.tag>${project.version}</docker.tag>

<veidemann.api.version>1.0.0-beta21</veidemann.api.version>
<veidemann.api.version>1.0.0-beta22</veidemann.api.version>
<veidemann.commons.version>0.4.9</veidemann.commons.version>
<veidemann.rethinkdbadapter.version>0.6.0</veidemann.rethinkdbadapter.version>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import no.nb.nna.veidemann.commons.client.OutOfScopeHandlerClient;
import no.nb.nna.veidemann.commons.client.RobotsServiceClient;
import no.nb.nna.veidemann.commons.db.DbService;
import no.nb.nna.veidemann.commons.opentracing.TracerFactory;
import no.nb.nna.veidemann.frontier.api.FrontierApiServer;
import no.nb.nna.veidemann.frontier.settings.Settings;
import no.nb.nna.veidemann.frontier.worker.DnsServiceClient;
Expand Down Expand Up @@ -60,7 +59,8 @@ public class FrontierService {
config.checkValid(ConfigFactory.defaultReference());
SETTINGS = ConfigBeanFactory.create(config, Settings.class);

TracerFactory.init("Frontier");
// TODO: Add tracing
// TracerFactory.init("Frontier");

asyncFunctionsExecutor = new ThreadPoolExecutor(2, 128, 15, TimeUnit.SECONDS,
new SynchronousQueue<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptors;
import io.grpc.services.HealthStatusManager;
import io.opentracing.contrib.ServerTracingInterceptor;
import io.opentracing.util.GlobalTracer;
import no.nb.nna.veidemann.api.frontier.v1.FrontierGrpc;
import no.nb.nna.veidemann.frontier.worker.Frontier;
import org.slf4j.Logger;
Expand Down Expand Up @@ -55,18 +53,21 @@ public FrontierApiServer(int port, int shutdownTimeoutSeconds, Frontier frontier
}

public FrontierApiServer(ServerBuilder<?> serverBuilder, Frontier frontier) {
ServerTracingInterceptor tracingInterceptor = new ServerTracingInterceptor.Builder(GlobalTracer.get())
.withTracedAttributes(ServerTracingInterceptor.ServerRequestAttribute.CALL_ATTRIBUTES,
ServerTracingInterceptor.ServerRequestAttribute.METHOD_TYPE)
.build();
// TODO: Add tracing
// ServerTracingInterceptor tracingInterceptor = new ServerTracingInterceptor.Builder(GlobalTracer.get())
// .withTracedAttributes(ServerTracingInterceptor.ServerRequestAttribute.CALL_ATTRIBUTES,
// ServerTracingInterceptor.ServerRequestAttribute.METHOD_TYPE)
// .build();

healthCheckerExecutorService = Executors.newScheduledThreadPool(1);
health = new HealthStatusManager();
healthCheckerExecutorService.scheduleAtFixedRate(new HealthChecker(frontier, health), 0, 1, TimeUnit.SECONDS);

frontierService = new FrontierService(frontier);
server = serverBuilder
.addService(ServerInterceptors.intercept(tracingInterceptor.intercept(frontierService),
// TODO: Add tracing
// .addService(ServerInterceptors.intercept(tracingInterceptor.intercept(frontierService),
.addService(ServerInterceptors.intercept(frontierService,
ConcurrencyLimitServerInterceptor.newBuilder(
new GrpcServerLimiterBuilder()
.partitionByMethod()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.opentracing.ActiveSpan;
import io.opentracing.contrib.OpenTracingContextKey;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import no.nb.nna.veidemann.api.frontier.v1.CountResponse;
import no.nb.nna.veidemann.api.frontier.v1.CrawlExecutionId;
import no.nb.nna.veidemann.api.frontier.v1.CrawlExecutionStatus;
Expand Down Expand Up @@ -67,13 +63,15 @@ public void awaitTermination() throws InterruptedException {

@Override
public void crawlSeed(CrawlSeedRequest request, StreamObserver<CrawlExecutionId> responseObserver) {
try (ActiveSpan span = GlobalTracer.get()
.buildSpan("scheduleSeed")
.asChildOf(OpenTracingContextKey.activeSpan())
.withTag(Tags.COMPONENT.getKey(), "Frontier")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
.withTag("uri", request.getSeed().getMeta().getName())
.startActive()) {
// TODO: Add tracing
// try (ActiveSpan span = GlobalTracer.get()
// .buildSpan("scheduleSeed")
// .asChildOf(OpenTracingContextKey.activeSpan())
// .withTag(Tags.COMPONENT.getKey(), "Frontier")
// .withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
// .withTag("uri", request.getSeed().getMeta().getName())
// .startActive()) {
try {
Futures.addCallback(ctx.getFrontier().scheduleSeed(request),
new FutureCallback<CrawlExecutionStatus>() {
public void onSuccess(CrawlExecutionStatus reply) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void onNext(PageHarvest value) {

@Override
public void onError(Throwable t) {
LOG.warn("gRPC Error from harvester", t);
try {
try {
if (postFetchHandler == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ private QueuedUri getNextQueuedUriToFetch() {
LOG.trace("Found Crawl Host Group ({})", chgId);

// try to find URI for CrawlHostGroup
FutureOptional<QueuedUri> foqu = getNextFetchableQueuedUriToForCrawlHostGroup(jedisContext, chg, conn);
FutureOptional<QueuedUri> foqu = getNextFetchableQueuedUriForCrawlHostGroup(jedisContext, chg, conn);

if (foqu.isPresent()) {
LOG.debug("Found Queued URI: {}, crawlHostGroup: {}",
Expand Down Expand Up @@ -358,7 +358,7 @@ public static String uriHash(String uri) {
return Hashing.sha256().hashUnencodedChars(uri).toString();
}

FutureOptional<QueuedUri> getNextFetchableQueuedUriToForCrawlHostGroup(JedisContext ctx, CrawlHostGroup crawlHostGroup, RethinkDbConnection conn) {
FutureOptional<QueuedUri> getNextFetchableQueuedUriForCrawlHostGroup(JedisContext ctx, CrawlHostGroup crawlHostGroup, RethinkDbConnection conn) {
NextUriScriptResult res = nextUriScript.run(ctx, crawlHostGroup);
if (res.future != null) {
return res.future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import no.nb.nna.veidemann.frontier.db.script.ChgDelayedQueueScript;
import no.nb.nna.veidemann.frontier.db.script.RedisJob.JedisContext;
import no.nb.nna.veidemann.frontier.worker.Frontier;
import no.nb.nna.veidemann.frontier.worker.StatusWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
Expand Down Expand Up @@ -96,21 +95,10 @@ public void run() {
String toBeRemoved = ctx.getJedis().lpop(CRAWL_EXECUTION_TIMEOUT_KEY);
while (toBeRemoved != null) {
try {
StatusWrapper s = StatusWrapper.getStatusWrapper(frontier, toBeRemoved);
switch (s.getState()) {
case SLEEPING:
case CREATED:
LOG.debug("CrawlExecution '{}' with state {} timed out", s.getId(), s.getState());
s.incrementDocumentsDenied(frontier.getCrawlQueueManager()
.deleteQueuedUrisForExecution(ctx, toBeRemoved))
.setEndState(State.ABORTED_TIMEOUT)
.saveStatus();
break;
default:
LOG.trace("CrawlExecution '{}' with state {} was already finished", s.getId(), s.getState());
}
conn.getExecutionsAdapter().setCrawlExecutionStateAborted(toBeRemoved, State.ABORTED_TIMEOUT);
} catch (Exception e) {
// Don't worry execution will be deleted at some point later
ctx.getJedis().rpush(CRAWL_EXECUTION_TIMEOUT_KEY, toBeRemoved);
}

toBeRemoved = ctx.getJedis().lpop(CRAWL_EXECUTION_TIMEOUT_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ public JobExecutionStatus run(JedisContext ctx, String jobExecutionId) {
return execute(ctx, jedis -> {
String key = JOB_EXECUTION_PREFIX + jobExecutionId;

if (!jedis.exists(key)) {
return null;
}
JobExecutionStatusRedisMapper m = new JobExecutionStatusRedisMapper(jedis.hgetAll(key));

return m.toJobExecutionStatus(jobExecutionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class CrawlExecutionHelpers {
public static void postFetchFinally(Frontier frontier, StatusWrapper status, QueuedUriWrapper qUri, long delayMs) {
MDC.put("eid", qUri.getExecutionId());
MDC.put("uri", qUri.getUri());

try {
if (qUri.hasError() && qUri.getDiscoveryPath().isEmpty()) {
if (qUri.getError().getCode() == ExtraStatusCodes.PRECLUDED_BY_ROBOTS.getCode()) {
Expand Down Expand Up @@ -99,15 +98,12 @@ public static void endCrawl(Frontier frontier, StatusWrapper status, State state
}

public static boolean isAborted(Frontier frontier, StatusWrapper status) throws DbException {
switch (status.getState()) {
switch (status.getDesiredState()) {
case ABORTED_MANUAL:
case ABORTED_TIMEOUT:
case ABORTED_SIZE:
status.incrementDocumentsDenied(frontier.getCrawlQueueManager()
.deleteQueuedUrisForExecution(status.getId()));

// Re-set end state to ensure end time is updated
status.setEndState(status.getState()).saveStatus();
// Set end state to desired state
status.setEndState(status.getDesiredState()).saveStatus();
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.opentracing.contrib.ClientTracingInterceptor;
import io.opentracing.util.GlobalTracer;
import no.nb.nna.veidemann.api.config.v1.ConfigRef;
import no.nb.nna.veidemann.api.dnsresolver.v1.DnsResolverGrpc;
import no.nb.nna.veidemann.api.dnsresolver.v1.ResolveReply;
Expand Down Expand Up @@ -58,8 +56,10 @@ public DnsServiceClient(final String host, final int port, ExecutorService execu

public DnsServiceClient(ManagedChannelBuilder<?> channelBuilder, ExecutorService executor) {
LOG.debug("Setting up DNS service client");
ClientTracingInterceptor tracingInterceptor = new ClientTracingInterceptor.Builder(GlobalTracer.get()).build();
channel = channelBuilder.intercept(tracingInterceptor).build();
// TODO: Add tracing
// ClientTracingInterceptor tracingInterceptor = new ClientTracingInterceptor.Builder(GlobalTracer.get()).build();
// channel = channelBuilder.intercept(tracingInterceptor).build();
channel = channelBuilder.build();
futureStub = DnsResolverGrpc.newFutureStub(channel);
this.executor = executor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ public void preprocessAndQueueSeed(CrawlSeedRequest request, StatusWrapper statu
Futures.transformAsync(future, c -> {
switch (c) {
case DENIED:
if (qUri.shouldInclude()) {
if (status.getState() == State.ABORTED_MANUAL) {
// Job was aborted before crawl execution was created. Ignore
} else if (qUri.shouldInclude()) {
// Seed was in scope, but failed for other reason
LOG.warn("Seed '{}' could not be crawled. Error: {}", qUri.getUri(), qUri.getError());
status.setEndState(State.FAILED)
Expand Down Expand Up @@ -227,6 +229,7 @@ public void preprocessAndQueueSeed(CrawlSeedRequest request, StatusWrapper statu
.setError(ExtraStatusCodes.ILLEGAL_URI.toFetchError(ex.toString()))
.saveStatus();
} catch (Exception ex) {
LOG.warn(ex.toString(), ex);
status.incrementDocumentsFailed()
.setEndState(CrawlExecutionStatus.State.FAILED)
.setError(ExtraStatusCodes.RUNTIME_EXCEPTION.toFetchError(ex.toString()))
Expand Down Expand Up @@ -283,6 +286,9 @@ public CrawlExecutionStatus createCrawlExecutionStatus(String jobId, String jobE
Map rMap = ProtoUtils.protoToRethink(status);
rMap.put("lastChangeTime", r.now());
rMap.put("createdTime", r.now());
// Set desiredState to ABORTED_MANUAL if JobExecution has desiredState ABORTED_MANUAL.
rMap.put("desiredState", r.table(Tables.JOB_EXECUTIONS.name).get(jobExecutionId).g("desiredState").default_("")
.do_(j -> r.branch(j.eq("ABORTED_MANUAL"), "ABORTED_MANUAL", "UNDEFINED")));

crawlQueueManager.updateJobExecutionStatus(jobExecutionId, State.UNDEFINED, State.CREATED, CrawlExecutionStatusChange.getDefaultInstance());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
import io.opentracing.Span;
import no.nb.nna.veidemann.api.commons.v1.Error;
import no.nb.nna.veidemann.api.config.v1.Annotation;
import no.nb.nna.veidemann.api.config.v1.ConfigObject;
Expand Down Expand Up @@ -58,7 +57,7 @@ public class PostFetchHandler {
private long delayMs = 0L;
private long fetchTimeMs = 0L;

private Span span;
// private Span span;

private AtomicBoolean done = new AtomicBoolean();
private AtomicBoolean finalized = new AtomicBoolean();
Expand Down Expand Up @@ -160,30 +159,35 @@ public void postFetchFinally() {
try {
calculateDelay();
} catch (DbException e) {
e.printStackTrace();
LOG.error(e.toString(), e);
}

frontier.postFetchThreadPool.submit(() -> {
MDC.put("eid", qUri.getExecutionId());
MDC.put("uri", qUri.getUri());


// Handle outlinks
for (QueuedUri outlink : outlinkQueue) {
try {
OutlinkHandler.processOutlink(frontier, status, qUri, outlink, scriptParameters, status.getCrawlJobConfig().getCrawlJob().getScopeScriptRef());
} catch (DbException e) {
// An error here indicates problems with DB communication. No idea how to handle that yet.
LOG.error("Error processing outlink: {}", e.toString(), e);
} catch (Throwable e) {
// Catch everything to ensure crawl host group gets released.
// Discovering this message in logs should be investigated as a possible bug.
LOG.error("Unknown error while processing outlink. Might be a bug", e);
try {
if (!CrawlExecutionHelpers.isAborted(frontier, status)) {
// Handle outlinks
for (QueuedUri outlink : outlinkQueue) {
try {
OutlinkHandler.processOutlink(frontier, status, qUri, outlink, scriptParameters, status.getCrawlJobConfig().getCrawlJob().getScopeScriptRef());
} catch (DbException e) {
// An error here indicates problems with DB communication. No idea how to handle that yet.
LOG.error("Error processing outlink: {}", e.toString(), e);
} catch (Throwable e) {
// Catch everything to ensure crawl host group gets released.
// Discovering this message in logs should be investigated as a possible bug.
LOG.error("Unknown error while processing outlink. Might be a bug", e);
}
}
}
} catch (DbException e) {
LOG.error(e.toString(), e);
}

CrawlExecutionHelpers.postFetchFinally(frontier, status, qUri, getDelay(TimeUnit.MILLISECONDS));
span.finish();
// span.finish();
});
}
}
Expand Down
Loading

0 comments on commit 8105a7c

Please sign in to comment.