forked from fcaylus/airflow-grpc-starter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathping_grpc.py
50 lines (42 loc) · 1.4 KB
/
ping_grpc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from datetime import timedelta
from airflow import DAG
from airflow.providers.grpc.operators.grpc import GrpcOperator
from airflow.utils.dates import days_ago
# These files are generated by the protobuf compiler
from proto.ping_pb2_grpc import PingServiceStub
from proto.ping_pb2 import PingRequest
def response_handler(response, context):
print("Response received from gRPC")
print("Response : " + response.message)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'retries': 0
}
dag = DAG(
'ping_grpc',
default_args=default_args,
description='Ping gRPC service',
schedule_interval=timedelta(days=1),
)
"""
Create a new gRPC Operator
"""
operator = GrpcOperator(
dag=dag,
task_id='grpc_request',
# gRPC Connection ID used for this operator.
# A connection of type "GRPC Connection" and with the same id should be configured in the Airflow Admin panel
# If omitted, defaults to "grpc_default"
grpc_conn_id='grpc_default',
# gRPC service stub class (must be imported from a generated grpc file)
stub_class=PingServiceStub,
# Method to call when this DAG is triggered
call_func='ping',
# Custom data sent to the "ping" method (can be the request, or grpc metadata)
data={'request': PingRequest()},
# Handler called on responses
response_callback=response_handler,
streaming=False,
)