Skip to content

Commit

Permalink
Merge pull request #58 from nlnwa/abolish-crawl-execution-current-uri-id
Browse files Browse the repository at this point in the history
Stop using currentUriId field in StatusWrapper and remove crawl execution workers
  • Loading branch information
johnerikhalse authored Dec 13, 2021
2 parents fe99ad0 + 7668523 commit 7ef77c2
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public CrawlQueueManager(Frontier frontier, RethinkDbConnection conn, JedisPool
jobExecutionGetScript = new JobExecutionGetScript();
jobExecutionUpdateScript = new JobExecutionUpdateScript();

this.crawlQueueWorker = new CrawlQueueWorker(frontier, conn, jedisPool);
this.crawlQueueWorker = new CrawlQueueWorker(frontier, jedisPool);
this.nextFetchSupplier = new TimeoutSupplier<>(64, 15, TimeUnit.SECONDS, 6,
() -> getPrefetchHandler(), p -> releaseCrawlHostGroup(p.getQueuedUri().getCrawlHostGroupId(), RESCHEDULE_DELAY));
}
Expand Down Expand Up @@ -427,8 +427,12 @@ public boolean removeTmpCrawlHostGroup(QueuedUri qUri, String tmpChgId, boolean
return removeQUri(qUri, tmpChgId, deleteUri);
}

public boolean removeQUri(QueuedUri qUri) {
return removeQUri(qUri, qUri.getCrawlHostGroupId(), true);
public boolean removeQUri(QueuedUriWrapper qUri) {
QueuedUri toBeRemoved = qUri.getQueuedUriForRemoval();
if (toBeRemoved.getId().isEmpty()) {
return false;
}
return removeQUri(qUri.getQueuedUriForRemoval(), qUri.getCrawlHostGroupId(), true);
}

private boolean removeQUri(QueuedUri qUri, String chgId, boolean deleteUri) {
Expand Down
140 changes: 1 addition & 139 deletions src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueWorker.java
Original file line number Diff line number Diff line change
@@ -1,110 +1,31 @@
package no.nb.nna.veidemann.frontier.db;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.rethinkdb.RethinkDB;
import no.nb.nna.veidemann.api.commons.v1.Error;
import no.nb.nna.veidemann.api.frontier.v1.CrawlExecutionStatus.State;
import no.nb.nna.veidemann.api.frontier.v1.CrawlHostGroup;
import no.nb.nna.veidemann.api.frontier.v1.JobExecutionStatus;
import no.nb.nna.veidemann.commons.ExtraStatusCodes;
import no.nb.nna.veidemann.commons.db.DbException;
import no.nb.nna.veidemann.commons.db.DbService;
import no.nb.nna.veidemann.db.ProtoUtils;
import no.nb.nna.veidemann.db.RethinkDbConnection;
import no.nb.nna.veidemann.db.Tables;
import no.nb.nna.veidemann.frontier.db.script.ChgBusyTimeoutScript;
import no.nb.nna.veidemann.frontier.db.script.ChgDelayedQueueScript;
import no.nb.nna.veidemann.frontier.db.script.RedisJob.JedisContext;
import no.nb.nna.veidemann.frontier.worker.Frontier;
import no.nb.nna.veidemann.frontier.worker.PostFetchHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Pipeline;

import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.CHG_BUSY_KEY;
import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.CHG_READY_KEY;
import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.CHG_TIMEOUT_KEY;
import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.CHG_WAIT_KEY;
import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.CRAWL_EXECUTION_RUNNING_KEY;
import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.CRAWL_EXECUTION_TIMEOUT_KEY;
import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.JOB_EXECUTION_PREFIX;
import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.REMOVE_URI_QUEUE_KEY;

public class CrawlQueueWorker implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(CrawlQueueWorker.class);

static final RethinkDB r = RethinkDB.r;

private final Frontier frontier;
private final RethinkDbConnection conn;
private final JedisPool jedisPool;
private final ChgDelayedQueueScript delayedChgQueueScript;
private final ChgBusyTimeoutScript chgBusyTimeoutScript;
private final ScheduledExecutorService executor;

Runnable chgQueueWorker = new Runnable() {
@Override
public void run() {
try (JedisContext ctx = JedisContext.forPool(jedisPool)) {
Long moved = delayedChgQueueScript.run(ctx, CHG_WAIT_KEY, CHG_READY_KEY);
if (moved > 0) {
LOG.debug("{} CrawlHostGroups moved from wait state to ready state", moved);
}

moved = delayedChgQueueScript.run(ctx, CHG_BUSY_KEY, CHG_TIMEOUT_KEY);
if (moved > 0) {
LOG.warn("{} CrawlHostGroups moved from busy state to wait state", moved);
}

moved = delayedChgQueueScript.run(ctx, CRAWL_EXECUTION_RUNNING_KEY, CRAWL_EXECUTION_TIMEOUT_KEY);
if (moved > 0) {
LOG.debug("{} CrawlExecutions moved from running state to timeout state", moved);
}
} catch (Exception e) {
e.printStackTrace();
} catch (Throwable t) {
t.printStackTrace();
System.exit(1);
}
}
};

Runnable removeUriQueueWorker = new Runnable() {
@Override
public void run() {
try (Jedis jedis = jedisPool.getResource()) {
List<String> toBeRemoved = jedis.lrange(REMOVE_URI_QUEUE_KEY, 0, 9999);
if (!toBeRemoved.isEmpty()) {
// Remove queued uris from DB
long deleted = conn.exec("db-deleteQueuedUri",
r.table(Tables.URI_QUEUE.name)
.getAll(toBeRemoved.toArray())
.delete().optArg("durability", "soft")
.g("deleted")
);
Pipeline p = jedis.pipelined();
for (String uriId : toBeRemoved) {
p.lrem(REMOVE_URI_QUEUE_KEY, 1, uriId);
}
p.sync();
LOG.debug("Deleted {} URIs from crawl queue", deleted);
}
} catch (Exception e) {
e.printStackTrace();
} catch (Throwable t) {
t.printStackTrace();
System.exit(1);
}
}
};

Runnable fetchTimeoutWorker = new Runnable() {
@Override
public void run() {
Expand Down Expand Up @@ -134,30 +55,6 @@ public void run() {
}
};

Runnable crawlExecutionTimeoutWorker = new Runnable() {
@Override
public void run() {
try (JedisContext ctx = JedisContext.forPool(jedisPool)) {
String toBeRemoved = ctx.getJedis().lpop(CRAWL_EXECUTION_TIMEOUT_KEY);
while (toBeRemoved != null) {
try {
conn.getExecutionsAdapter().setCrawlExecutionStateAborted(toBeRemoved, State.ABORTED_TIMEOUT);
} catch (Exception e) {
// Don't worry execution will be deleted at some point later
ctx.getJedis().rpush(CRAWL_EXECUTION_TIMEOUT_KEY, toBeRemoved);
}

toBeRemoved = ctx.getJedis().lpop(CRAWL_EXECUTION_TIMEOUT_KEY);
}
} catch (Exception e) {
e.printStackTrace();
} catch (Throwable t) {
t.printStackTrace();
System.exit(1);
}
}
};

Runnable checkPaused = new Runnable() {
@Override
public void run() {
Expand All @@ -174,48 +71,13 @@ public void run() {
}
};

Runnable updateJobExecutions = new Runnable() {
@Override
public void run() {
try (JedisContext ctx = JedisContext.forPool(jedisPool)) {
ctx.getJedis().keys(JOB_EXECUTION_PREFIX + "*").stream()
.map(key -> key.substring(JOB_EXECUTION_PREFIX.length()))
.forEach(jobExecutionId -> {
JobExecutionStatus tjes = frontier.getCrawlQueueManager().getTempJobExecutionStatus(ctx, jobExecutionId);
try {
conn.exec("db-saveJobExecutionStatus",
r.table(Tables.JOB_EXECUTIONS.name).get(jobExecutionId).update(doc ->
r.branch(doc.g("state").match("FINISHED|ABORTED_TIMEOUT|ABORTED_SIZE|ABORTED_MANUAL|FAILED|DIED"),
doc,
ProtoUtils.protoToRethink(tjes))
));
} catch (DbException e) {
LOG.warn("Could not update jobExecutionState", e);
}
});
} catch (Exception e) {
e.printStackTrace();
} catch (Throwable t) {
t.printStackTrace();
System.exit(1);
}
}
};

public CrawlQueueWorker(Frontier frontier, RethinkDbConnection conn, JedisPool jedisPool) {
public CrawlQueueWorker(Frontier frontier, JedisPool jedisPool) {
this.frontier = frontier;
this.conn = conn;
this.jedisPool = jedisPool;
executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("CrawlQueueWorker-%d").build());

delayedChgQueueScript = new ChgDelayedQueueScript();
chgBusyTimeoutScript = new ChgBusyTimeoutScript();
executor.scheduleWithFixedDelay(chgQueueWorker, 400, 50, TimeUnit.MILLISECONDS);
executor.scheduleWithFixedDelay(removeUriQueueWorker, 1000, 200, TimeUnit.MILLISECONDS);
executor.scheduleWithFixedDelay(fetchTimeoutWorker, 1200, 500, TimeUnit.MILLISECONDS);
executor.scheduleWithFixedDelay(crawlExecutionTimeoutWorker, 1100, 1100, TimeUnit.MILLISECONDS);
executor.scheduleWithFixedDelay(checkPaused, 3, 3, TimeUnit.SECONDS);
executor.scheduleWithFixedDelay(updateJobExecutions, 5, 5, TimeUnit.SECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public void preprocessAndQueueSeed(CrawlSeedRequest request, StatusWrapper statu
}

ListenableFuture<PreconditionState> future = Preconditions.checkPreconditions(this, crawlConfig, status, qUri);

Futures.transformAsync(future, c -> {
switch (c) {
case DENIED:
Expand Down Expand Up @@ -246,7 +247,7 @@ public void preprocessAndQueueSeed(CrawlSeedRequest request, StatusWrapper statu

// Prefetch ok, add to queue
try {
boolean wasAdded = qUri.addUriToQueue(status);
boolean wasAdded = qUri.addUriToQueue();
if (wasAdded) {
LOG.debug("Seed '{}' added to queue", qUri.getUri());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ public static boolean processOutlink(Frontier frontier, StatusWrapper status, Qu
case OK:
LOG.debug("Found new URI: {}, queueing.", outUri.getUri());
outUri.setPriorityWeight(status.getCrawlConfig().getCrawlConfig().getPriorityWeight());
if (outUri.addUriToQueue(status)) {
if (outUri.addUriToQueue()) {
wasQueued = true;
}
break;
case RETRY:
LOG.debug("Failed preconditions for: {}, queueing for retry.", outUri.getUri());
outUri.setPriorityWeight(status.getCrawlConfig().getCrawlConfig().getPriorityWeight());
if (outUri.addUriToQueue(status)) {
if (outUri.addUriToQueue()) {
wasQueued = true;
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ public void postFetchSuccess(Metrics metrics) throws DbException {
MDC.put("eid", qUri.getExecutionId());
MDC.put("uri", qUri.getUri());

frontier.getCrawlQueueManager().removeQUri(qUri);
status.incrementDocumentsCrawled()
.incrementBytesCrawled(metrics.getBytesDownloaded())
.incrementUrisCrawled(metrics.getUriCount())
.removeCurrentUri(qUri).saveStatus();
.saveStatus();
}
}

Expand All @@ -149,13 +150,13 @@ public void postFetchFailure(Error error) throws DbException {
MDC.put("uri", qUri.getUri());

PreconditionState state = ErrorHandler.fetchFailure(frontier, status, qUri, error);
switch (state) {
status.saveStatus();
switch(state) {
case DENIED:
status.removeCurrentUri(qUri).saveStatus();
frontier.getCrawlQueueManager().removeQUri(qUri);
break;
case RETRY:
qUri.save();
status.addCurrentUri(qUri).saveStatus();
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public boolean preFetch() throws DbException {
try {
if (!Preconditions.crawlExecutionOk(frontier, status)) {
LOG.debug("DENIED");
status.removeCurrentUri(qUri).saveStatus();
frontier.getCrawlQueueManager().removeQUri(qUri);
CrawlExecutionHelpers.postFetchFinally(frontier, status, qUri, 0);
return false;
}
Expand Down Expand Up @@ -133,7 +133,7 @@ public PageHarvestSpec getHarvestSpec() throws DbException {
qUri.setFetchStartTimeStamp(ProtoUtils.getNowTs());
qUri.generateSessionToken();
frontier.getCrawlQueueManager().updateCrawlHostGroup(qUri.getCrawlHostGroup());
status.addCurrentUri(this.qUri).setState(State.FETCHING).saveStatus();
status.setState(State.FETCHING).saveStatus();

LOG.debug("Fetching " + qUri.getUri());
return PageHarvestSpec.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public static ListenableFuture<PreconditionState> checkPreconditions(Frontier fr
frontier.writeLog(frontier, qUri);
}
if (!qUri.isUnresolved()) {
status.removeCurrentUri(qUri);
frontier.getCrawlQueueManager().removeQUri(qUri);
}
status.incrementDocumentsOutOfScope();
frontier.getOutOfScopeHandlerClient().submitUri(qUri.getQueuedUri());
Expand Down Expand Up @@ -170,7 +170,7 @@ public void onFailure(Throwable t) {
}
}
if (state == PreconditionState.DENIED && !qUri.getCrawlHostGroupId().isEmpty() && !qUri.getQueuedUri().getId().isEmpty()) {
status.removeCurrentUri(qUri);
frontier.getCrawlQueueManager().removeQUri(qUri);
}
future.set(state);
} catch (DbException e) {
Expand Down Expand Up @@ -237,7 +237,7 @@ public void accept(Boolean isAllowed) {
if (changedCrawlHostGroup != null) {
frontier.getCrawlQueueManager().removeTmpCrawlHostGroup(qUri.getQueuedUri(), changedCrawlHostGroup, true);
} else {
status.removeCurrentUri(qUri);
frontier.getCrawlQueueManager().removeQUri(qUri);
}
LOG.info("URI '{}' precluded by robots.txt", qUri.getUri());
qUri.setError(ExtraStatusCodes.PRECLUDED_BY_ROBOTS.toFetchError());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ public QueuedUriWrapper save() throws DbQueryException, DbConnectionException {
return this;
}

public boolean addUriToQueue(StatusWrapper status) throws DbException {
public boolean addUriToQueue() throws DbException {
if (frontier.getCrawlQueueManager().uriNotIncludedInQueue(this)) {
return forceAddUriToQueue(status);
return forceAddUriToQueue();
}
LOG.debug("Found already included URI: {}, skipping.", getUri());
setError(ExtraStatusCodes.ALREADY_SEEN.toFetchError("Uri was already harvested"));
Expand All @@ -259,7 +259,7 @@ public boolean addUriToQueue(StatusWrapper status) throws DbException {
* @return
* @throws DbException
*/
public boolean forceAddUriToQueue(StatusWrapper status) throws DbException {
public boolean forceAddUriToQueue() throws DbException {
if (!shouldInclude()) {
return false;
}
Expand Down
Loading

0 comments on commit 7ef77c2

Please sign in to comment.