-
Notifications
You must be signed in to change notification settings - Fork 24
/
example.py
122 lines (107 loc) · 5.29 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from farmbot import Farmbot, FarmbotToken
# Before we begin, we must download an access token from the
# API. To avoid copy/pasting passwords, it is best to create
# an access token and then store that token securely:
raw_token = FarmbotToken.download_token("test@example.com",
"password",
"https://my.farm.bot")
# This token is then passed to the Farmbot constructor:
fb = Farmbot(raw_token)
# If you are just doing testing, such as local development,
# it is possible to skip token creation and login with email
# and password. This is not recommended for production devices:
# fb = Farmbot.login(email="em@i.l",
# password="pass",
# server="https://my.farm.bot")
# The next step is to call fb.connect(), but we are not ready
# to do that yet. Before we can call connect(), we must
# create a "handler" object. FarmBot control is event-based
# and the handler object is responsible for integrating all
# of those events into a custom application.
#
# At a minimum, the handler must respond to the following
# methods:
# on_connect(self, bot: Farmbot, client: Mqtt) -> None
# on_change(self, bot: Farmbot, state: Dict[Any, Any]) -> None
# on_log(self, _bot: Farmbot, log: Dict[Any, Any]) -> None
# on_error(self, _bot: Farmbot, _response: ErrorResponse) -> None
# on_response(self, _bot: Farmbot, _response: OkResponse) -> None
#
# FarmBotPy will call the appropriate method whenever an event
# is triggered. For example, the method `on_log` will be
# called with the last log message every time a new log
# message is created.
class MyHandler:
# The `on_connect` event is called whenever the device
# connects to the MQTT server. You can place initialization
# logic here.
#
# The callback is passed a FarmBot instance, plus an MQTT
# client object (see Paho MQTT docs to learn more).
def on_connect(self, bot, mqtt_client):
# Once the bot is connected, we can send RPC commands.
# Every RPC command returns a unique, random request
# ID. Later on, we can use this ID to track our commands
# as they succeed/fail (via `on_response` / `on_error`
# callbacks):
request_id1 = bot.move_absolute(x=10, y=20, z=30)
# => "c580-6c-11-94-130002"
print("MOVE_ABS REQUEST ID: " + request_id1)
request_id2 = bot.send_message("Hello, world!")
# => "2000-31-49-11-c6085c"
print("SEND_MESSAGE REQUEST ID: " + request_id2)
def on_change(self, bot, state):
# The `on_change` event is most frequently triggered
# event. It is called any time the device's internal
# state changes. Example: Updating X/Y/Z position as
# the device moves across the garden.
# The bot maintains all this state in a single JSON
# object that is broadcast over MQTT constantly.
# It is a very large object, so we are printing it
# only as an example.
print("NEW BOT STATE TREE AVAILABLE:")
print(state)
# Since the state tree is very large, we offer
# convenience helpers such as `bot.position()`,
# which returns an (x, y, z) tuple of the device's
# last known position:
print("Current position: (%.2f, %.2f, %.2f)" % bot.position())
# A less convenient method would be to access the state
# tree directly:
pos = state["location_data"]["position"]
xyz = (pos["x"], pos["y"], pos["z"])
print("Same information as before: " + str(xyz))
# The `on_log` event fires every time a new log is created.
# The callback receives a FarmBot instance, plus a JSON
# log object. The most useful piece of information is the
# `message` attribute, though other attributes do exist.
def on_log(self, bot, log):
print("New message from FarmBot: " + log['message'])
# When a response succeeds, the `on_response` callback
# fires. This callback is passed a FarmBot object, as well
# as a `response` object. The most important part of the
# `response` is `response.id`. This `id` will match the
# original request ID, which is useful for cross-checking
# pending operations.
def on_response(self, bot, response):
print("ID of successful request: " + response.id)
# If an RPC request fails (example: stalled motors, firmware
# timeout, etc..), the `on_error` callback is called.
# The callback receives a FarmBot object, plus an
# ErrorResponse object.
def on_error(self, bot, response):
# Remember the unique ID that was returned when we
# called `move_absolute()` earlier? We can cross-check
# the ID by calling `response.id`:
print("ID of failed request: " + response.id)
# We can also retrieve a list of error message(s) by
# calling response.errors:
print("Reason(s) for failure: " + str(response.errors))
# Now that we have a handler class to use, let's create an
# instance of that handler and `connect()` it to the FarmBot:
handler = MyHandler()
# Keep in mind that `connect()` will block the current thread.
# If you require parallel operations, consider using a background
# thread or a worker process.
fb.connect(handler)
print("This line will not execute. `connect()` is a blocking call.")