Skip to content

Commit

Permalink
Migrates Gateway code to MQTT example (#1977)
Browse files Browse the repository at this point in the history
* Migrates Gateway code to MQTT example
* Refactors attach device and updates tests
  • Loading branch information
gguuss authored Jan 28, 2019
1 parent 2fea7b6 commit 65830eb
Show file tree
Hide file tree
Showing 4 changed files with 382 additions and 29 deletions.
2 changes: 1 addition & 1 deletion iot/api-client/manager/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cryptography==2.4.2
flaky==3.4.0
flaky==3.5.3
gcp-devrel-py-tools==0.0.15
google-api-python-client==1.7.5
google-auth-httplib2==0.0.3
Expand Down
265 changes: 238 additions & 27 deletions iot/api-client/mqtt_example/cloudiot_mqtt_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# [START iot_mqtt_includes]
import argparse
import datetime
import logging
import os
import random
import ssl
Expand All @@ -33,6 +34,8 @@
import paho.mqtt.client as mqtt
# [END iot_mqtt_includes]

logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.CRITICAL)

# The initial backoff time after a disconnection occurs, in seconds.
minimum_backoff_time = 1

Expand Down Expand Up @@ -169,37 +172,193 @@ def get_client(
# [END iot_mqtt_config]


def detach_device(client, device_id):
"""Detach the device from the gateway."""
# [START detach_device]
detach_topic = '/devices/{}/detach'.format(device_id)
print('Detaching: {}'.format(detach_topic))
client.publish(detach_topic, '{}', qos=1)
# [END detach_device]


def attach_device(client, device_id, auth):
"""Attach the device to the gateway."""
# [START attach_device]
attach_topic = '/devices/{}/attach'.format(device_id)
attach_payload = '{{"authorization" : "{}"}}'.format(auth)
client.publish(attach_topic, attach_payload, qos=1)
# [END attach_device]


def listen_for_messages(
service_account_json, project_id, cloud_region, registry_id, device_id,
gateway_id, num_messages, private_key_file, algorithm, ca_certs,
mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, duration,
cb=None):
"""Listens for messages sent to the gateway and bound devices."""
# [START listen_for_messages]
global minimum_backoff_time

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
project_id, cloud_region, registry_id, gateway_id,
private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
mqtt_bridge_port)

attach_device(client, device_id, '')
print('Waiting for device to attach.')
time.sleep(5)

# The topic devices receive configuration updates on.
device_config_topic = '/devices/{}/config'.format(device_id)
client.subscribe(device_config_topic, qos=1)

# The topic gateways receive configuration updates on.
gateway_config_topic = '/devices/{}/config'.format(gateway_id)
client.subscribe(gateway_config_topic, qos=1)

# The topic gateways receive error updates on. QoS must be 0.
error_topic = '/devices/{}/errors'.format(gateway_id)
client.subscribe(error_topic, qos=0)

# Wait for about a minute for config messages.
for i in range(1, duration):
client.loop()
if cb is not None:
cb(client)

if should_backoff:
# If backoff time is too large, give up.
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
print('Exceeded maximum backoff time. Giving up.')
break

delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
time.sleep(delay)
minimum_backoff_time *= 2
client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
if seconds_since_issue > 60 * jwt_exp_mins:
print('Refreshing token after {}s').format(seconds_since_issue)
jwt_iat = datetime.datetime.utcnow()
client = get_client(
project_id, cloud_region, registry_id, gateway_id,
private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
mqtt_bridge_port)

time.sleep(1)

detach_device(client, device_id)

print('Finished.')
# [END listen_for_messages]


def send_data_from_bound_device(
service_account_json, project_id, cloud_region, registry_id, device_id,
gateway_id, num_messages, private_key_file, algorithm, ca_certs,
mqtt_bridge_hostname, mqtt_bridge_port, jwt_expires_minutes, payload):
"""Sends data from a gateway on behalf of a device that is bound to it."""
# [START send_data_from_bound_device]
global minimum_backoff_time

# Publish device events and gateway state.
device_topic = '/devices/{}/{}'.format(device_id, 'state')
gateway_topic = '/devices/{}/{}'.format(gateway_id, 'state')

jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = jwt_expires_minutes
# Use gateway to connect to server
client = get_client(
project_id, cloud_region, registry_id, gateway_id,
private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
mqtt_bridge_port)

attach_device(client, device_id, '')
print('Waiting for device to attach.')
time.sleep(5)

# Publish state to gateway topic
gateway_state = 'Starting gateway at: {}'.format(time.time())
print(gateway_state)
client.publish(gateway_topic, gateway_state, qos=1)

# Publish num_messages mesages to the MQTT bridge
for i in range(1, num_messages + 1):
client.loop()

if should_backoff:
# If backoff time is too large, give up.
if minimum_backoff_time > MAXIMUM_BACKOFF_TIME:
print('Exceeded maximum backoff time. Giving up.')
break

delay = minimum_backoff_time + random.randint(0, 1000) / 1000.0
time.sleep(delay)
minimum_backoff_time *= 2
client.connect(mqtt_bridge_hostname, mqtt_bridge_port)

payload = '{}/{}-{}-payload-{}'.format(
registry_id, gateway_id, device_id, i)

print('Publishing message {}/{}: \'{}\' to {}'.format(
i, num_messages, payload, device_topic))
client.publish(
device_topic, '{} : {}'.format(device_id, payload), qos=1)

seconds_since_issue = (datetime.datetime.utcnow() - jwt_iat).seconds
if seconds_since_issue > 60 * jwt_exp_mins:
print('Refreshing token after {}s').format(seconds_since_issue)
jwt_iat = datetime.datetime.utcnow()
client = get_client(
project_id, cloud_region, registry_id, gateway_id,
private_key_file, algorithm, ca_certs, mqtt_bridge_hostname,
mqtt_bridge_port)

time.sleep(5)

detach_device(client, device_id)

print('Finished.')
# [END send_data_from_bound_device]


def parse_command_line_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description=(
'Example Google Cloud IoT Core MQTT device connection code.'))
parser.add_argument(
'--project_id',
default=os.environ.get('GOOGLE_CLOUD_PROJECT'),
help='GCP cloud project name')
parser.add_argument(
'--registry_id', required=True, help='Cloud IoT Core registry id')
parser.add_argument(
'--device_id', required=True, help='Cloud IoT Core device id')
parser.add_argument(
'--private_key_file',
required=True, help='Path to private key file.')
parser.add_argument(
'--algorithm',
choices=('RS256', 'ES256'),
required=True,
help='Which encryption algorithm to use to generate the JWT.')
parser.add_argument(
'--cloud_region', default='us-central1', help='GCP cloud region')
parser.add_argument(
'--ca_certs',
default='roots.pem',
help=('CA root from https://pki.google.com/roots.pem'))
parser.add_argument(
'--num_messages',
'--cloud_region', default='us-central1', help='GCP cloud region')
parser.add_argument(
'--data',
default='Hello there',
help='The telemetry data sent on behalf of a device')
parser.add_argument(
'--device_id', required=True, help='Cloud IoT Core device id')
parser.add_argument(
'--gateway_id', required=False, help='Gateway identifier.')
parser.add_argument(
'--jwt_expires_minutes',
default=20,
type=int,
default=100,
help='Number of messages to publish.')
help=('Expiration time, in minutes, for JWT tokens.'))
parser.add_argument(
'--listen_dur',
default=60,
type=int,
help='Duration (seconds) to listen for configuration messages')
parser.add_argument(
'--message_type',
choices=('event', 'state'),
Expand All @@ -217,19 +376,48 @@ def parse_command_line_args():
type=int,
help='MQTT bridge port.')
parser.add_argument(
'--jwt_expires_minutes',
default=20,
'--num_messages',
type=int,
help=('Expiration time, in minutes, for JWT tokens.'))
default=100,
help='Number of messages to publish.')
parser.add_argument(
'--private_key_file',
required=True,
help='Path to private key file.')
parser.add_argument(
'--project_id',
default=os.environ.get('GOOGLE_CLOUD_PROJECT'),
help='GCP cloud project name')
parser.add_argument(
'--registry_id', required=True, help='Cloud IoT Core registry id')
parser.add_argument(
'--service_account_json',
default=os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"),
help='Path to service account json file.')

# Command subparser
command = parser.add_subparsers(dest='command')

command.add_parser(
'device_demo',
help=mqtt_device_demo.__doc__)

command.add_parser(
'gateway_send',
help=send_data_from_bound_device.__doc__)

command.add_parser(
'gateway_listen',
help=listen_for_messages.__doc__)

return parser.parse_args()


# [START iot_mqtt_run]
def main():
def mqtt_device_demo(args):
"""Connects a device, sends data, and receives data."""
# [START iot_mqtt_run]
global minimum_backoff_time

args = parse_command_line_args()
global MAXIMUM_BACKOFF_TIME

# Publish to the events or state topic based on the flag.
sub_topic = 'events' if args.message_type == 'event' else 'state'
Expand All @@ -239,9 +427,9 @@ def main():
jwt_iat = datetime.datetime.utcnow()
jwt_exp_mins = args.jwt_expires_minutes
client = get_client(
args.project_id, args.cloud_region, args.registry_id, args.device_id,
args.private_key_file, args.algorithm, args.ca_certs,
args.mqtt_bridge_hostname, args.mqtt_bridge_port)
args.project_id, args.cloud_region, args.registry_id,
args.device_id, args.private_key_file, args.algorithm,
args.ca_certs, args.mqtt_bridge_hostname, args.mqtt_bridge_port)

# Publish num_messages mesages to the MQTT bridge once per second.
for i in range(1, args.num_messages + 1):
Expand Down Expand Up @@ -284,9 +472,32 @@ def main():

# Send events every second. State should not be updated as often
time.sleep(1 if args.message_type == 'event' else 5)
# [END iot_mqtt_run]


def main():
args = parse_command_line_args()

if args.command == 'gateway_listen':
listen_for_messages(
args.service_account_json, args.project_id,
args.cloud_region, args.registry_id, args.device_id,
args.gateway_id, args.num_messages, args.private_key_file,
args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
args.mqtt_bridge_port, args.jwt_expires_minutes,
args.listen_dur)
return
elif args.command == 'gateway_send':
send_data_from_bound_device(
args.service_account_json, args.project_id,
args.cloud_region, args.registry_id, args.device_id,
args.gateway_id, args.num_messages, args.private_key_file,
args.algorithm, args.ca_certs, args.mqtt_bridge_hostname,
args.mqtt_bridge_port, args.jwt_expires_minutes, args.data)
return
else:
mqtt_device_demo(args)
print('Finished.')
# [END iot_mqtt_run]


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 65830eb

Please sign in to comment.