-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcrawlmanager.py
110 lines (91 loc) · 3.9 KB
/
crawlmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
"""
shub-workflow based script (runs on Zyte ScrapyCloud) for easy managing of consumer spiders
from multiple slots. It checks available slots and schedules a job for each one.
"""
import json
import random
import logging
import humanize
from shub_workflow.crawl import CrawlManager
from hcf_backend.utils.hcfpal import HCFPal
logging.basicConfig(format="%(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
class HCFCrawlManager(CrawlManager):
default_max_jobs = 1
def __init__(self):
super().__init__()
self.hcfpal = HCFPal(self.project_id)
def add_argparser_options(self):
super().add_argparser_options()
self.argparser.add_argument("frontier", help="Frontier name")
self.argparser.add_argument("prefix", help="Slot prefix")
self.argparser.add_argument("--frontera-settings-json")
@property
def description(self):
return __doc__
def print_frontier_status(self):
result = self.hcfpal.get_slots_count(self.args.frontier, self.args.prefix)
logger.info(f"Frontier '{self.args.frontier}' status:")
for slot in sorted(result["slots"].keys()):
cnt_text = "\t{}: {}".format(slot, result["slots"][slot])
logger.info(cnt_text)
logger.info("\tTotal count: {}".format(humanize.intcomma(result["total"])))
return set(result["slots"].keys())
def workflow_loop(self):
available_slots = self.print_frontier_status()
running_jobs = 0
states = "running", "pending"
for state in states:
for job in self.get_owned_jobs(
spider=self.args.spider, state=state, meta=["spider_args"]
):
frontera_settings_json = json.loads(
job["spider_args"].get("frontera_settings_json", "{}")
)
if "HCF_CONSUMER_SLOT" in frontera_settings_json:
slot = frontera_settings_json["HCF_CONSUMER_SLOT"]
if slot in available_slots:
available_slots.discard(slot)
running_jobs += 1
available_slots = [
slot
for slot in available_slots
if self.hcfpal.get_slot_count(self.args.frontier, slot) > 0
]
logger.info(f"Available slots: {available_slots!r}")
if available_slots:
random.shuffle(available_slots)
if self.max_running_jobs:
max_jobs = self.max_running_jobs - running_jobs
available_slots = available_slots[:max_jobs]
if not available_slots:
logger.info("Already running max number of jobs.")
base_frontera_settings = {}
if self.args.frontera_settings_json:
base_frontera_settings = json.loads(self.args.frontera_settings_json)
for slot in available_slots:
frontera_settings = base_frontera_settings.copy()
frontera_settings.update(
{
"HCF_CONSUMER_SLOT": slot,
"HCF_CONSUMER_FRONTIER": self.args.frontier,
}
)
frontera_settings_json = json.dumps(frontera_settings)
logger.info(
f"Will schedule spider job with frontera settings {frontera_settings_json}"
)
jobkey = self.schedule_spider_with_jobargs(
job_args_override={
"spider_args": {"frontera_settings_json": frontera_settings_json},
},
spider=self.args.spider,
)
logger.info(
f"Scheduled job {jobkey} with frontera settings {frontera_settings_json}"
)
return True
return bool(running_jobs)
if __name__ == "__main__":
manager = HCFCrawlManager()
manager.run()