-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathmain.py
executable file
·311 lines (246 loc) · 10.1 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import StringIO
import json
import random
import urllib
import urllib2
import logging
import ConfigParser
import os
# standard app engine imports
from google.appengine.api import urlfetch
import webapp2
TOKEN = ""
HOOK_TOKEN = ""
OWM_KEY = ""
PROJECT_ID = ""
# Lambda functions to parse updates from Telegram
def getText(update): return update["message"]["text"]
def getLocation(update): return update["message"]["location"]
def getChatId(update): return update["message"]["chat"]["id"]
def getName(update): return update["message"]["from"]["first_name"]
def getResult(update): return update["result"]
# # Lambda functions to parse weather responses
def getDesc(w): return w["weather"][0]["description"]
def getTemp(w): return w["main"]["temp"]
def getCity(w): return w["name"]
logger = logging.getLogger("LizBot")
logger.setLevel(logging.DEBUG)
# Cities for weather requests
cities = ["London", "Brasov"]
# Accepted commands
commands = ["/weather", "/fact", "/mirror", "/fortune", "/trivia"]
# Keep track of conversation states: 'weatherReq', 'verifying', 'verified'
chats = {}
# Expected answers for pending trivia question
expected = {}
# --------------- Helper functions ---------------
# Read settings from configuration file
def parseConfig():
global BASE_URL, URL_OWM, HOOK_TOKEN, PROJECT_ID
c = ConfigParser.ConfigParser()
c.read("config.ini")
TOKEN = c.get("Settings", "TOKEN")
BASE_URL = "https://api.telegram.org/bot" + TOKEN + "/"
OWM_KEY = c.get("Settings", "OWM_KEY")
URL_OWM = "http://api.openweathermap.org/data/2.5/weather?appid={}&units=metric".format(OWM_KEY)
HOOK_TOKEN = c.get("Settings", "HOOK_TOKEN")
PROJECT_ID = c.get("Settings", "PROJECT_ID")
# Set requests timeout (default is 15)
def setTimeout(numSec = 60):
urlfetch.set_default_fetch_deadline(numSec)
# Deserialise object and serialise it to JSON formatted string
def formatResp(obj):
parsed = json.load(obj)
return json.dumps(parsed, indent=4, sort_keys=True)
# Make a request and get JSON response
def makeRequest(url):
logger.debug("URL: %s" % url)
r = urllib2.urlopen(url)
resp = json.load(r)
return resp
# Build a one-time keyboard for on-screen options
def buildKeyboard(items):
keyboard = [[{"text":item}] for item in items]
replyKeyboard = {"keyboard":keyboard, "one_time_keyboard": True}
logger.debug(replyKeyboard)
return json.dumps(replyKeyboard)
# Send URL-encoded message to chat id
def sendMessage(text, chatId, interface=None):
params = {
"chat_id": str(chatId),
"text": text.encode("utf-8"),
"parse_mode": "Markdown",
}
if interface:
params["reply_markup"] = interface
resp = urllib2.urlopen(BASE_URL + "sendMessage", urllib.urlencode(params)).read()
# --------------- Weather related stuff ---------------
# Query OWM for the weather for place or coords
def getWeather(place):
if isinstance(place, dict): # coordinates provided
lat, lon = place["latitude"], place["longitude"]
url = URL_OWM + "&lat=%f&lon=%f&cnt=1" % (lat, lon)
logger.info("Requesting weather: " + url)
js = makeRequest(url)
logger.debug(js)
return u"%s \N{DEGREE SIGN}C, %s in %s" % (getTemp(js), getDesc(js), getCity(js))
else: # place name provided
# make req
url = URL_OWM + "&q={}".format(place)
logger.info("Requesting weather: " + url)
js = makeRequest(url)
logger.debug(js)
return u"%s \N{DEGREE SIGN}C, %s in %s" % (getTemp(js), getDesc(js), getCity(js))
def buildCitiesKeyboard():
keyboard = [[{"text": c}] for c in cities]
keyboard.append([{"text": "Share location", "request_location": True}])
replyKeyboard = {"keyboard": keyboard, "one_time_keyboard": True}
logger.debug(replyKeyboard)
return json.dumps(replyKeyboard)
# --------------- Random facts/fortune/compliments/challenges functions ---------------
# Read random line from big text file
def getRandom(fName):
f = open(fName, "r")
fSize = os.stat(fName)[6]
# Seek to a random place in the file
f.seek(random.randint(0, fSize-1))
# The first readline since it may fall in the middle of a line
f.readline()
# Read the next complete line
line = f.readline()
# If last line, wrap and read the first
if not line:
f.seek(0)
f.readline()
f.close()
return line.strip()
def getFact():
fName = "facts.txt"
return getRandom(fName)
def getCompliment():
fName = "compliments.txt"
return getRandom(fName)
def getCookie():
fName = "fortunes.dat"
return getRandom(fName)
def getChall():
questions = json.loads(open("trivia.json").read())
numQ = len(questions)
return questions[random.randint(0, numQ-1)]
# --------------- Request handler functions ---------------
# Return basic information about the bot
class MeHandler(webapp2.RequestHandler):
def get(self):
setTimeout()
parseConfig()
url = BASE_URL + "getMe"
respBuf = urllib2.urlopen(url)
self.response.headers["Content-Type"] = "text/plain"
self.response.write(formatResp(respBuf))
# Get information about webhook status
class GetWebhookHandler(webapp2.RequestHandler):
def get(self):
setTimeout()
parseConfig()
url = BASE_URL + "getWebhookInfo"
respBuf = urllib2.urlopen(url)
self.response.headers["Content-Type"] = "text/plain"
self.response.write(formatResp(respBuf))
# Set a webhook url for Telegram to POST to
class SetWebhookHandler(webapp2.RequestHandler):
def get(self):
setTimeout()
parseConfig()
hookUrl = "https://%s.appspot.com/TG%s" % (PROJECT_ID, HOOK_TOKEN)
logger.info("Setting new webhook to: %s" % hookUrl)
respBuf = urllib2.urlopen(BASE_URL + "setWebhook", urllib.urlencode({
"url": hookUrl
}))
self.response.headers["Content-Type"] = "text/plain"
self.response.write(formatResp(respBuf))
# Remove webhook integration
class DeleteWebhookHandler(webapp2.RequestHandler):
def get(self):
setTimeout()
parseConfig()
url = BASE_URL + "deleteWebhook"
respBuf = urllib2.urlopen(url)
self.response.headers["Content-Type"] = "text/plain"
self.response.write(formatResp(respBuf))
# Handler for the webhook, called by Telegram
class WebhookHandler(webapp2.RequestHandler):
def post(self):
setTimeout()
parseConfig()
logger.info("Received request: %s from %s" % (self.request.url, self.request.remote_addr))
if HOOK_TOKEN not in self.request.url:
# Not coming from Telegram
logger.error("Post request without token from IP: %s" % self.request.remote_addr)
return
body = json.loads(self.request.body)
chatId = getChatId(body)
logger.info("Response body: " + str(body))
try:
text = getText(body)
except Exception as e:
logger.info("No text field in update. Try to get location")
loc = getLocation(body)
# Was weather previously requested?
if (chatId in chats) and (chats[chatId] == "weatherReq"):
logger.info("Weather requested for %s in chat id %d" % (str(loc), chatId))
# Send weather to chat id and clear state
sendMessage(getWeather(loc), chatId)
del chats[chatId]
return
if text == "/start":
keyboard = buildKeyboard(commands)
sendMessage("Hello %s! Why not try the commands below:" % getName(body), chatId, keyboard)
elif text == "/weather":
keyboard = buildCitiesKeyboard()
chats[chatId] = "weatherReq"
sendMessage("Select a city", chatId, keyboard)
elif (text in cities) and (chatId in chats) and (chats[chatId] == "weatherReq"):
logger.info("Weather requested for %s" % text)
# Send weather to chat id and clear state
sendMessage(getWeather(text), chatId)
del chats[chatId]
elif text == "/fact":
sendMessage(getFact(), chatId)
elif text == "/mirror":
sendMessage(getCompliment(), chatId)
elif text == "/fortune":
sendMessage(getCookie(), chatId)
elif text == "/trivia":
# Has user answered the challenge?
if (chatId not in chats) or (chats[chatId] != "waitingAnswer"):
logger.info("Sendia trivia question")
chal = getChall()
question = chal["Question"]
answers = chal["Answers"]
keyboard = buildKeyboard(answers)
sendMessage("New chat! Please verify by answering a simple question!", chatId)
sendMessage(question, chatId, keyboard)
# Change status
chats[chatId] = "waitingAnswer"
expected[chatId] = answers[chal["Correct"]]
elif (chatId in chats) and (chats[chatId] == "waitingAnswer"):
logger.info("Verify user answer")
if text == expected[chatId]:
logger.info("Correct trivia answer!")
sendMessage("Wow! That is correct!", chatId)
else:
logger.info("Incorrect answer")
sendMessage("Wrong answer :(", chatId)
del chats[chatId]
elif text.startswith("/"):
sendMessage("Cahn's Axiom: When all else fails, read the instructions", chatId)
else:
keyboard = buildKeyboard(commands)
sendMessage("I learn new things every day but for now you can ask me about the following:", chatId, keyboard)
app = webapp2.WSGIApplication([
('/me', MeHandler),
('/set_webhook', SetWebhookHandler),
('/get_webhook', GetWebhookHandler),
('/del_webhook', DeleteWebhookHandler),
(r'/TG.*' , WebhookHandler),
], debug=True)