forked from collective/jquery.recurrenceinput.js
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_server.py
executable file
·203 lines (168 loc) · 6.27 KB
/
test_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#!/usr/bin/env python
# This returns a formattedDate attribute on each occurrence,
# this is deprecated, as it requires i18n on both client and
# server side. Formatting is now done on the client side.
from wsgiref.simple_server import make_server
from wsgiref.util import setup_testing_defaults
from mimetypes import guess_type
import os
import sys
import SocketServer
import urlparse
import datetime
from dateutil import rrule
import json
import re
if len(sys.argv) > 1:
port = int(sys.argv[1])
else:
port = 8000
BATCH_DELTA = 3 # How many batches to show before + after current batch
# Translations from dateinput formatting to Python formatting
DATEFORMAT_XLATE = [
(re.compile(pattern), replacement) for (pattern, replacement) in (
('dddd', '%A'),
('ddd', '%a'),
('dd', '%d'),
('!%d', '%e'), # Will include a leading space for 1-9
('mmmm', '%B'),
('mmm', '%b'),
('mm', '%m'),
('!%m', '%m'), # Will include leading zero
('yyyy', '%Y'),
('yy', '%y'),
)
]
def dateformat_xlate(dateformat):
for regexp, replacement in DATEFORMAT_XLATE:
dateformat = regexp.sub(replacement, dateformat)
return dateformat
def calculate_occurrences(data):
# TODO: Return error on failure
occurrences = []
date_format = dateformat_xlate(data['format'][0])
start_date = datetime.datetime(int(data['year'][0]),
int(data['month'][0]),
int(data['day'][0]))
rule = rrule.rrulestr(data['rrule'][0], dtstart=start_date)
iterator = iter(rule)
if 'batch_size' in data:
batch_size = int(data['batch_size'][0])
else:
batch_size = 10
if 'start' in data:
start = int(data['start'][0])
else:
start = 0
cur_batch = start // batch_size
start = cur_batch * batch_size # Avoid stupid start-values
if hasattr(rule, '_exdate'):
exdates = sorted(rule._exdate)
else:
exdates = []
# Loop through the start first dates, to skip them:
i = 0
occurrences = []
while True:
try:
# Get a date
date = iterator.next()
except StopIteration:
# No more dates
break
while exdates and date > exdates[0]:
# There are exdates that appear before this date:
if i < start:
# Skip them
exdates.pop(0)
i += 1
else:
# include them
exdate = exdates.pop(0)
occurrences.append({'date': exdate.strftime('%Y%m%dT%H%M%S'),
'formattedDate': exdate.strftime(date_format),
'type': 'exdate',})
i += 1
if i >= batch_size + start:
break # We are done!
i += 1
if i <= start:
# We are still iterating up to the first event, so skip this:
continue
# Add it to the results
if date in getattr(rule, '_rdate', []):
occurrence_type = 'rdate'
elif date == start_date:
occurrence_type = 'start'
else:
occurrence_type = 'rrule'
occurrences.append({'date': date.strftime('%Y%m%dT%H%M%S'),
'formattedDate': date.strftime(date_format),
'type': occurrence_type,})
while exdates:
# There are exdates that are after the end of the recurrence.
# Excluding the last dates make no sense, as you can change the
# range instead, but we need to support it anyway.
exdate = exdates.pop(0)
occurrences.append({'date': exdate.strftime('%Y%m%dT%H%M%S'),
'formattedDate': exdate.strftime(date_format),
'type': 'exdate',})
# Calculate no of occurrences, but only to a max of three times
# the batch size. This will support infinite recurrence in a
# useable way, as there will always be more batches.
first_batch = max(0, cur_batch - BATCH_DELTA)
last_batch = max(BATCH_DELTA * 2, cur_batch + BATCH_DELTA)
maxcount = (batch_size * last_batch) - start
num_occurrences = 0
while True:
try:
iterator.next()
num_occurrences += 1
except StopIteration:
break
if num_occurrences >= maxcount:
break
# Total number of occurrences:
num_occurrences += batch_size + start
max_batch = (num_occurrences - 1)//batch_size
if last_batch > max_batch:
last_batch = max_batch
first_batch = max(0, max_batch - (BATCH_DELTA * 2))
batches = [((x * batch_size) + 1, (x + 1) * batch_size) for x in range(first_batch, last_batch + 1)]
batch_data = {'start': start,
'end': num_occurrences,
'batch_size': batch_size,
'batches': batches,
'currentBatch': cur_batch - first_batch,
}
return {'occurrences': occurrences, 'batch': batch_data}
def application(environ, start_response):
setup_testing_defaults(environ)
if environ['REQUEST_METHOD'] == 'POST':
length = int(environ['CONTENT_LENGTH'])
data_string = environ['wsgi.input'].read(length)
data = urlparse.parse_qs(data_string)
print "Recieved data:", data
# Check for required parameters:
for x in ('year', 'month', 'day', 'rrule', 'format'):
assert x in data
result = calculate_occurrences(data)
response_body = json.dumps(result)
status = '200 OK'
headers = [('Content-type', 'application/json'),
('Content-Length', str(len(response_body)))]
start_response(status, headers)
return [response_body]
else:
filename = os.path.join(*environ['PATH_INFO'].split('/')[1:])
response_body = open(filename, 'rb').read()
status = '200 OK'
headers = [('Content-type', guess_type(filename)[0]),
('Content-Length', str(len(response_body)))]
start_response(status, headers)
return [response_body]
httpd = make_server('', port, application)
print "Serving at port", port
# Respond to requests until process is killed
while True:
httpd.handle_request()