-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtime_manager.py
84 lines (78 loc) · 2.85 KB
/
time_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
try:
import usocket as socket
except:
import socket
try:
import ustruct as struct
except:
import struct
import network
import machine
import utime
class TimeManager(object):
# (date(2000, 1, 1) - date(1900, 1, 1)).days * 24*60*60
NTP_DELTA = 3155673600
def __init__(self,
host = "pool.ntp.org",
port = 123,
debug = False,
):
#load the station network interface so we can determine connection status
self.sta_if = network.WLAN(network.STA_IF)
self.rtc = machine.RTC()
self.host = host
self.port = port
self._debug = debug
def request_ntp_time(self):
if self._debug:
print("### TimeManager.request_ntp_time ###")
NTP_QUERY = bytearray(48)
NTP_QUERY[0] = 0x1b
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1)
addr = socket.getaddrinfo(self.host, self.port)[0][-1]
res = sock.sendto(NTP_QUERY, addr)
msg = sock.recv(48)
val = struct.unpack("!I", msg[40:44])[0]
# There's currently no timezone support in MicroPython, so
# utime.localtime() will return UTC time (as if it was .gmtime())
t = val - self.NTP_DELTA
tm = utime.localtime(t)
if self._debug:
print("Received NTP time t=%d, tm=%r" % (t,tm))
return tm
finally:
sock.close()
def get_datetime(self, force_RTC_time = False, sync_RTC = True):
if self._debug:
print("### TimeManager.get_datetime ###")
if self.sta_if.isconnected() and not force_RTC_time:
try:
tm = self.request_ntp_time()
dt = tm[0:3] + (0,) + tm[3:6] + (0,)
if sync_RTC:
if self._debug:
print("Synchronizing RTC to NTP time")
print("RTC time before:",self.rtc.datetime())
self.rtc.datetime(dt) #sync the RTC
if self._debug:
print("RTC time after:",self.rtc.datetime())
except OSError as err:
print("WARNING! got exception: %s" % err)
return self.rtc.datetime()
################################################################################
# TEST CODE
################################################################################
if __name__ == "__main__":
import network_setup
network_setup.do_connect()
TM = TimeManager()
print(TM.get_datetime())
# print(TM.get_datetime(force_RTC_time = True))
# utime.sleep(1)
# print(TM.get_datetime())
# print(TM.get_datetime(force_RTC_time = True))
# utime.sleep(1)
# print(TM.get_datetime())
# print(TM.get_datetime(force_RTC_time = True))