-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathfirebase.py
133 lines (102 loc) · 3.44 KB
/
firebase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# adapted from firebase/EventSource-Examples/python/chat.py by Shariq Hashme
from sseclient import SSEClient
import requests
from Queue import Queue
import json
import threading
import socket
class ClosableSSEClient(SSEClient):
def __init__(self, *args, **kwargs):
self.should_connect = True
super(ClosableSSEClient, self).__init__(*args, **kwargs)
def _connect(self):
if self.should_connect:
super(ClosableSSEClient, self)._connect()
else:
raise StopIteration()
def close(self):
self.should_connect = False
self.retry = 0
try:
self.resp.raw._fp.fp._sock.shutdown(socket.SHUT_RDWR)
self.resp.raw._fp.fp._sock.close()
except AttributeError:
pass
class RemoteThread(threading.Thread):
def __init__(self, parent, URL, function):
self.function = function
self.URL = URL
self.parent = parent
super(RemoteThread, self).__init__()
def run(self):
try:
self.sse = ClosableSSEClient(self.URL, chunk_size=32)
for msg in self.sse:
msg_data = json.loads(msg.data)
if msg_data is None: # keep-alives
continue
msg_event = msg.event
# TODO: update parent cache here
self.function((msg.event, msg_data))
except socket.error:
pass # this can happen when we close the stream
except KeyboardInterrupt:
self.close()
def close(self):
if self.sse:
self.sse.close()
def firebaseURL(URL):
if '.firebaseio.com' not in URL.lower():
if '.json' == URL[-5:]:
URL = URL[:-5]
if '/' in URL:
if '/' == URL[-1]:
URL = URL[:-1]
URL = 'https://' + \
URL.split('/')[0] + '.firebaseio.com/' + URL.split('/', 1)[1] + '.json'
else:
URL = 'https://' + URL + '.firebaseio.com/.json'
return URL
if 'http://' in URL:
URL = URL.replace('http://', 'https://')
if 'https://' not in URL:
URL = 'https://' + URL
if '.json' not in URL.lower():
if '/' != URL[-1]:
URL = URL + '/.json'
else:
URL = URL + '.json'
return URL
class subscriber:
def __init__(self, URL, function):
self.cache = {}
self.remote_thread = RemoteThread(self, firebaseURL(URL), function)
def start(self):
self.remote_thread.start()
def stop(self):
self.remote_thread.close()
self.remote_thread.join()
def wait(self):
self.remote_thread.join()
class FirebaseException(Exception):
pass
def put(URL, msg):
to_post = json.dumps(msg)
response = requests.put(firebaseURL(URL), data=to_post)
if response.status_code != 200:
raise FirebaseException(response.text)
def patch(URL, msg):
to_post = json.dumps(msg)
response = requests.patch(firebaseURL(URL), data=to_post)
if response.status_code != 200:
raise FirebaseException(response.text)
def get(URL):
response = requests.get(firebaseURL(URL))
if response.status_code != 200:
raise FirebaseException(response.text)
return json.loads(response.text)
def push(URL, msg):
to_post = json.dumps(msg)
response = requests.post(firebaseURL(URL), data=to_post)
if response.status_code != 200:
raise Exception(response.text)