-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpatient.py
626 lines (553 loc) · 22.6 KB
/
patient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
"""Patient API - implements patient specific views such as patient search
NB - this is not to be confused with 'patients', which defines views
for staff
"""
from datetime import datetime
import json
from flask import Blueprint, abort, current_app, jsonify, request
from sqlalchemy import and_
from werkzeug.exceptions import Unauthorized
from ..audit import auditable_event
from ..cache import cache
from ..database import db
from ..date_tools import FHIR_datetime
from ..extensions import oauth
from ..models.adherence_data import sorted_adherence_data
from ..models.communication import Communication
from ..models.fhir import bundle_results
from ..models.identifier import (
Identifier,
UserIdentifier,
parse_identifier_params,
)
from ..models.message import EmailMessage
from ..models.overall_status import OverallStatus
from ..models.qb_timeline import QBT, update_users_QBT
from ..models.questionnaire_bank import QuestionnaireBank, trigger_date
from ..models.questionnaire_response import QuestionnaireResponse
from ..models.reference import Reference
from ..models.research_study import (
ResearchStudy,
research_study_id_from_questionnaire
)
from ..models.role import ROLE
from ..models.user import User, current_user, get_user
from ..models.user_consent import consent_withdrawal_dates
from ..timeout_lock import ADHERENCE_DATA_KEY, CacheModeration
from .crossdomain import crossdomain
from .demographics import demographics
patient_api = Blueprint('patient_api', __name__)
@patient_api.route('/api/patient/')
@crossdomain()
@oauth.require_oauth()
def patient_search():
"""Looks up patient(s) from given parameters, returns FHIR Patient bundle
Takes key=value pairs to look up. Email, identifier searches supported.
Example searches:
/api/patient/?email=username@server.com
/api/patient/?identifier=http://us.truenth.org/identity-codes/external-study-id|123-45-678
Identifier search pattern:
?identifier=<system>|<value>
Deleted users and users for which the authenticated user does not have
permission to view will NOT be included in the results.
NB - the results are restricted to users with the patient role. It is
therefore possible to get no results from this and still see a unique email
collision from existing non-patient users.
NB - currently out of FHIR DSTU2 spec by default. Include query string
parameter ``patch_dstu2=True`` to properly return a FHIR bundle resource
(https://www.hl7.org/fhir/DSTU2/bundle.html) naming the ``total`` matches
and references to all matching patients. With ``patch_dstu2=True``, the
total will be zero if no matches are found, whereas the default (old,
non-compliant) behavior is to return a 404 when no match is found.
Please consider using the ``patch_dstu2=True`` parameter, as this will
become the default behavior in the future.
Returns a FHIR bundle resource (https://www.hl7.org/fhir/DSTU2/bundle.html)
formatted in JSON for all matching valid, accessible patients, with
``patch_dstu2=True`` set (preferred). Default returns single patient on a
match, 404 on no match, and 400 for multiple as it's not supported.
---
tags:
- Patient
operationId: patient_search
produces:
- application/json
parameters:
- name: search_parameters
in: query
description:
Search parameters, such as `email` or `identifier`. For
identifier, URL-encode the `system` and `value` using '|' (pipe)
delimiter, i.e. `api/patient/?identifier=http://fake.org/id|12a7`
required: true
type: string
- name: patch_dstu2
in: query
description: whether or not to return DSTU2 compliant bundle
required: false
type: boolean
default: false
responses:
200:
description:
Returns FHIR patient resource
(https://www.hl7.org/fhir/DSTU2/patient.html) in JSON if a match
is found. Otherwise responds with a 404 status code.
401:
description:
if missing valid OAuth token
404:
description:
if there is no match found, or the user lacks permission to look
up details on the match.
security:
- ServiceToken: []
"""
if not request.args.items():
abort(400, "missing search criteria")
query = User.query.filter(User.deleted_id.is_(None))
for k, v in request.args.items():
if k == 'email':
query = query.filter(User.email == v)
elif k == 'identifier':
system, value = parse_identifier_params(v)
if not (system and value):
abort(400, "need 'system' and 'value' to look up identifier")
query = query.join(UserIdentifier).join(
Identifier).filter(and_(
UserIdentifier.identifier_id == Identifier.id,
Identifier.system == system,
Identifier._value == value))
elif k == 'patch_dstu2':
# not search criteria, but valid
continue
else:
abort(400, "can't search on '{}' at this time".format(k))
# Validate permissions to see every requested user - omitting those w/o
patients = []
for user in query:
try:
current_user().check_role(permission='view', other_id=user.id)
if user.has_role(ROLE.PATIENT.value):
patients.append(
{'resource': Reference.patient(user.id).as_fhir()})
except Unauthorized:
# Mask unauthorized as a not-found. Don't want unauthed users
# farming information - i.e. don't add to results
auditable_event("looking up users with inadequate permission",
user_id=current_user().id, subject_id=user.id,
context='authentication')
if request.args.get('patch_dstu2', False):
link = {"rel": "self", "href": request.url}
return jsonify(bundle_results(elements=patients, links=[link]))
else:
# Emulate old results
if len(patients) == 0:
abort(404)
if len(patients) > 1:
abort(400, "multiple results found, include `patch_dstu2=True`")
ref = Reference.parse(patients[0]['resource'])
return demographics(patient_id=ref.id)
@patient_api.route('/api/patient/<int:patient_id>/deceased', methods=('POST',))
@crossdomain()
@oauth.require_oauth()
def post_patient_deceased(patient_id):
"""POST deceased datetime or status for a patient
This convenience API wraps the ability to set a patient's
deceased status and deceased date time - generally the /api/demographics
API should be preferred.
---
operationId: deceased
tags:
- Patient
produces:
- application/json
parameters:
- name: patient_id
in: path
description: TrueNTH user ID
required: true
type: integer
format: int64
- in: body
name: body
schema:
id: deceased_details
properties:
deceasedBoolean:
type: string
description:
true or false. Implicitly true with deceasedDateTime value
deceasedDateTime:
type: string
description: valid FHIR datetime string defining time of death
responses:
200:
description:
Returns updated [FHIR patient
resource](http://www.hl7.org/fhir/patient.html) in JSON.
400:
description:
if given parameters don't function, such as a false
deceasedBoolean AND a deceasedDateTime value.
401:
description:
if missing valid OAuth token or logged-in user lacks permission
to view requested patient
security:
- ServiceToken: []
"""
patient = get_user(patient_id, 'edit')
if not request.json or set(request.json.keys()).isdisjoint(
{'deceasedDateTime', 'deceasedBoolean'}):
abort(400, "Requires deceasedDateTime or deceasedBoolean in JSON")
try:
patient.update_deceased(request.json)
except ValueError as ve:
if "Dates prior to year 1900 not supported" in str(ve):
abort(
400,
"deceasedDateTime unrealistically historic (pre-1900),"
f" please review: {request.json['deceasedDateTime']}")
else:
raise ve
db.session.commit()
auditable_event("updated demographics on user {0} from input {1}".format(
patient.id, json.dumps(request.json)), user_id=current_user().id,
subject_id=patient.id, context='user')
return jsonify(patient.as_fhir(include_empties=False))
@patient_api.route(
'/api/patient/<int:patient_id>/birthdate', methods=('POST',))
@patient_api.route(
'/api/patient/<int:patient_id>/birthDate', methods=('POST',))
@oauth.require_oauth()
def post_patient_dob(patient_id):
"""POST date of birth for a patient
This convenience API wraps the ability to set a patient's birthDate.
Generally the /api/demographics API should be preferred.
---
tags:
- Patient
produces:
- application/json
parameters:
- name: patient_id
in: path
description: TrueNTH user ID
required: true
type: integer
format: int64
- in: body
name: body
schema:
id: dob_details
properties:
birthDate:
type: string
description: valid FHIR date string defining date of birth
responses:
200:
description:
Returns updated [FHIR patient
resource](http://www.hl7.org/fhir/patient.html) in JSON.
400:
description:
if given parameters don't validate
401:
description:
if missing valid OAuth token or logged-in user lacks permission
to edit requested patient
security:
- ServiceToken: []
"""
patient = get_user(patient_id, 'edit')
if not request.json or 'birthDate' not in request.json:
abort(400, "Requires `birthDate` in JSON")
patient.update_birthdate(request.json)
db.session.commit()
auditable_event("updated demographics on user {0} from input {1}".format(
patient.id, json.dumps(request.json)), user_id=current_user().id,
subject_id=patient.id, context='user')
return jsonify(patient.as_fhir(include_empties=False))
@patient_api.route('/api/patient/<int:patient_id>/timeline')
@oauth.require_oauth()
def patient_timeline(patient_id):
"""Display details for the user's Questionnaire Bank Timeline
Optional query parameters
:param purge: set 'true' to recreate QBTimeline, 'all' to also reset
QNR -> QB assignments
:param research_study_id: set to alternative research study ID - default 0
:param trace: set 'true' to view detailed logs generated, works best in
concert with purge
:param only: set to filter all results but top level attribute given
"""
from ..date_tools import FHIR_datetime, RelativeDelta
from ..models.organization import (
UserOrganization,
OrganizationResearchProtocol,
)
from ..models.qbd import QBD
from ..models.qb_status import QB_Status
from ..models.questionnaire_bank import visit_name
from ..models.questionnaire_response import aggregate_responses
from ..models.research_protocol import ResearchProtocol
from ..tasks import cache_single_patient_adherence_data
from ..trace import dump_trace, establish_trace
user = get_user(patient_id, permission='view')
trace = request.args.get('trace', False)
if trace:
establish_trace("BEGIN time line lookup for {}".format(patient_id))
try:
research_study_id = int(request.args.get('research_study_id', 0))
except ValueError:
abort(400, "integer value required for 'research_study_id'")
purge = request.args.get('purge', False)
try:
# If purge was given special 'all' value, also wipe out associated
# questionnaire_response : qb relationships and remove cache lock
# on adherence data.
if purge == 'all':
# remove adherence cache key to allow fresh run
cache_moderation = CacheModeration(key=ADHERENCE_DATA_KEY.format(
patient_id=patient_id,
research_study_id=research_study_id))
cache_moderation.reset()
QuestionnaireResponse.purge_qb_relationship(
subject_id=patient_id,
research_study_id=research_study_id,
acting_user_id=current_user().id)
cache.delete_memoized(trigger_date)
update_users_QBT(
patient_id,
research_study_id=research_study_id,
invalidate_existing=purge)
except ValueError as ve:
abort(500, str(ve))
results = []
# We order by at (to get the latest status for a given QB) and
# secondly by id, as on rare occasions, the time (`at`) of
# `due` == `completed`, but the row insertion defines priority
for qbt in QBT.query.filter(QBT.user_id == patient_id).filter(
QBT.research_study_id == research_study_id).order_by(
QBT.at, QBT.id):
# build qbd for visit name
qbd = QBD(
relative_start=qbt.at, qb_id=qbt.qb_id,
iteration=qbt.qb_iteration, recur_id=qbt.qb_recur_id)
if qbt.status == OverallStatus.withdrawn:
results.append({
'status': str(qbt.status),
'at': FHIR_datetime.as_fhir(qbt.at)})
else:
data = {
'status': str(qbt.status),
'at': FHIR_datetime.as_fhir(qbt.at),
'qb (id, iteration)': "{} ({}, {})".format(
qbd.questionnaire_bank.name, qbd.qb_id, qbd.iteration),
'visit': visit_name(qbd)}
if qbt.status == OverallStatus.due:
data['questionnaires'] = ','.join(
[q.name for q in qbd.questionnaire_bank.questionnaires])
data['expires'] = FHIR_datetime.as_fhir(
qbt.at + RelativeDelta(qbd.questionnaire_bank.expired))
results.append(data)
qb_names = {qb.id: qb.name for qb in QuestionnaireBank.query.all()}
qnrs = QuestionnaireResponse.query.filter(
QuestionnaireResponse.subject_id == patient_id).order_by(
QuestionnaireResponse.document['authored'])
def get_recur_id(qnr):
if qnr.questionnaire_bank and len(qnr.questionnaire_bank.recurs):
return qnr.questionnaire_bank.recurs[0].id
return None
posted = [
"{}, {}, {} ({}, {}), {}, {}".format(
qnr.id,
qnr.document['authored'],
visit_name(QBD(
relative_start=None, qb_id=qnr.questionnaire_bank_id,
iteration=qnr.qb_iteration, recur_id=get_recur_id(qnr))),
qb_names.get(qnr.questionnaire_bank_id),
qnr.qb_iteration,
qnr.status,
qnr.document['questionnaire']['reference'].split('/')[-1])
for qnr in qnrs]
rp_query = ResearchProtocol.query.join(
OrganizationResearchProtocol,
OrganizationResearchProtocol.research_protocol_id ==
ResearchProtocol.id).join(
UserOrganization,
UserOrganization.organization_id ==
OrganizationResearchProtocol.organization_id).filter(
UserOrganization.user_id == patient_id).with_entities(
ResearchProtocol.name, OrganizationResearchProtocol.retired_as_of)
rps = []
for rp in rp_query:
msg = rp.name
if rp.retired_as_of:
msg = f"{msg}, retired: {rp.retired_as_of}"
rps.append(msg)
qbstatus = QB_Status(
user=User.query.get(patient_id),
research_study_id=research_study_id,
as_of_date=datetime.utcnow())
prev_qbd = qbstatus.prev_qbd
current = qbstatus.current_qbd()
next_qbd = qbstatus.next_qbd
status = {
'overall': str(qbstatus.overall_status),
'previous QBD': prev_qbd.as_json() if prev_qbd else None,
'current QBD': current.as_json() if current else None,
'next QBD': next_qbd.as_json() if next_qbd else None,
}
if current:
status['due'] = qbstatus.due_date
status['overdue'] = qbstatus.overdue_date
status['expired'] = qbstatus.expired_date
status['needing-full'] = qbstatus.instruments_needing_full_assessment()
status['in-progress'] = qbstatus.instruments_in_progress()
status['completed'] = qbstatus.instruments_completed()
indef_qbd, indef_status = qbstatus.indef_status()
if indef_qbd:
status['indefinite QBD'] = indef_qbd.as_json()
status['indefinite status'] = indef_status
adherence_data = sorted_adherence_data(patient_id, research_study_id)
if not adherence_data:
# immediately following a cache purge, adherence data is gone and
# needs to be recreated.
kwargs = {
"patient_id": user.id,
"research_study_id": research_study_id,
}
cache_single_patient_adherence_data(**kwargs)
adherence_data = sorted_adherence_data(patient_id, research_study_id)
qnr_responses = aggregate_responses(
instrument_ids=None,
current_user=current_user(),
research_study_id=research_study_id,
patch_dstu2=True,
ignore_qb_requirement=True,
patient_ids=[patient_id]
)
# filter qnr data to a manageable result data set
qnr_data = []
for row in qnr_responses['entry']:
i = {}
d = row['resource']
i['questionnaire'] = d['questionnaire']['reference'].split('/')[-1]
# qnr_responses return all. filter to requested research_study
study_id = research_study_id_from_questionnaire(i['questionnaire'])
if study_id != research_study_id:
continue
i['auth_method'] = d['encounter']['auth_method']
i['encounter_period'] = d['encounter']['period']
i['document_authored'] = d['authored']
try:
i['ae_session'] = d['identifier']['value']
except KeyError:
# happens with sub-study follow up, skip ae_session
pass
i['status'] = d['status']
i['org'] = d['subject']['careProvider'][0]['display']
i['visit'] = d['timepoint']
qnr_data.append(i)
consent_date, withdrawal_date = consent_withdrawal_dates(user, research_study_id)
consents = {"consent_date": consent_date, "withdrawal_date": withdrawal_date}
kwargs = {
"consents": consents,
"rps": rps,
"status": status,
"posted": posted,
"timeline": results,
"adherence_data": adherence_data,
"qnr_data": qnr_data
}
if trace:
kwargs["trace"] = dump_trace("END time line lookup")
only = request.args.get('only', False)
if only:
if only not in kwargs:
raise ValueError(f"{only} not in {kwargs.keys()}")
return jsonify(only, kwargs[only])
return jsonify(**kwargs)
@patient_api.route('/api/patient/<int:patient_id>/timewarp/<int:days>')
@crossdomain()
@oauth.require_oauth()
def patient_timewarp(patient_id, days):
"""Debugging view to time warp patient's data back in time
:param patient_id: the patient to time warp
:param days: the number of days to move back - forward moves not supported
NB: only available on testing and development systems.
"""
from dateutil.relativedelta import relativedelta
from copy import deepcopy
from portal.models.questionnaire_response import QuestionnaireResponse
from portal.models.user_consent import UserConsent
if current_app.config['SYSTEM_TYPE'] == "production":
abort(404)
if days < 1:
abort(500, 'only possible to move a positive number of days back')
user = get_user(patient_id, 'edit')
# user_consent
changed = []
delta = relativedelta(days=days)
for uc in UserConsent.query.filter(UserConsent.user_id == user.id):
changed.append(f"user_consent {uc.id}")
uc.acceptance_date = uc.acceptance_date - delta
# questionnaire_responses
for qnr in QuestionnaireResponse.query.filter(
QuestionnaireResponse.subject_id == user.id):
changed.append(f"questionnaire_response {qnr.id}")
new_authored = FHIR_datetime.parse(qnr.document['authored']) - delta
doc = deepcopy(qnr.document)
doc['authored'] = FHIR_datetime.as_fhir(new_authored)
qnr.document = doc
# trigger_state
if current_app.config['GIL'] is None:
from ..trigger_states.models import TriggerState
for ts in TriggerState.query.filter(
TriggerState.user_id == user.id):
changed.append(f"trigger_state {ts.id}")
ts.timestamp = ts.timestamp - delta
if ts.triggers is not None:
triggers = deepcopy(ts.triggers)
# Some early records don't include source-authored
if 'authored' in triggers['source']:
triggers['source']['authored'] = FHIR_datetime.as_fhir(
FHIR_datetime.parse(triggers['source']['authored'])
- delta)
if 'actions' in triggers:
for email in triggers['actions']['email']:
email['timestamp'] = FHIR_datetime.as_fhir(
FHIR_datetime.parse(email['timestamp']) - delta)
ts.triggers = triggers
# reminder email dates
for em in EmailMessage.query.join(
Communication, Communication.message_id == EmailMessage.id).filter(
EmailMessage.recipient_id == user.id):
changed.append(f'email_message {em.id}')
em.sent_at = em.sent_at - delta
# access records in audit
from ..models.audit import Audit
query = Audit.query.filter(Audit.subject_id == user.id).filter(
Audit._context == 'access')
for ar in query:
changed.append(f"audit {ar.id}")
ar.timestamp = ar.timestamp - delta
db.session.commit()
# Recalculate users timeline & qnr associations
cache.delete_memoized(trigger_date)
for research_study_id in ResearchStudy.assigned_to(user):
QuestionnaireResponse.purge_qb_relationship(
subject_id=patient_id,
research_study_id=research_study_id,
acting_user_id=current_user().id)
update_users_QBT(
patient_id,
research_study_id=research_study_id,
invalidate_existing=True)
auditable_event(
message=f"TIME WARPED existing data back {days} days.",
user_id=current_user().id,
subject_id=patient_id,
context="assessment"
)
return jsonify(changed=changed)