This repository has been archived by the owner on Jul 21, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathosrm_scrape.py
58 lines (48 loc) · 1.7 KB
/
osrm_scrape.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from airflow.utils.dates import days_ago
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
log = LoggingMixin().log
try:
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
args = {"owner": "Robin",
"retries":0,
"email": ["robin.linacre@digital.justice.gov.uk"],
"start_date": days_ago(1)}
dag = DAG(
dag_id="osrm_scrape",
default_args=args,
schedule_interval='@once',
)
osrm_scrape = KubernetesPodOperator(
namespace="airflow",
image="593291632749.dkr.ecr.eu-west-1.amazonaws.com/airflow-osrm-scrape:v0.0.5",
cmds=["bash", "-c"],
arguments=["python main.py"],
labels={"foo": "bar"},
name="osrm-pod-scrape",
in_cluster=True,
task_id="scrape_all",
get_logs=True,
dag=dag,
annotations={"iam.amazonaws.com/role": "airflow_osrm_scraper"},
image_pull_policy='Always'
)
run_gluejob = KubernetesPodOperator(
namespace="airflow",
image="593291632749.dkr.ecr.eu-west-1.amazonaws.com/airflow-osrm-scrape:v0.0.5",
cmds=["bash", "-c"],
arguments=["python run_glue.py"],
labels={"foo": "bar"},
name="osrm-pod-glue",
in_cluster=True,
task_id="deduplicate_and_partition",
get_logs=True,
dag=dag,
annotations={"iam.amazonaws.com/role": "airflow_osrm_scraper"},
image_pull_policy='Always'
)
osrm_scrape >> run_gluejob
except ImportError as e:
log.warn("Could not import KubernetesPodOperator: " + str(e))