diff --git a/manage.py b/manage.py index 8576702ad..13cadc19d 100644 --- a/manage.py +++ b/manage.py @@ -615,94 +615,6 @@ def update_qnr(qnr_id, link_id, actor, noop, replacement): click.echo(message) -@click.option('--subject_id', type=int, multiple=True, help="Subject user ID", required=True) -@click.option( - '--actor', - default="__system__", - required=False, - help='email address of user taking this action, for audit trail' -) -@app.cli.command() -def remove_post_withdrawn_qnrs(subject_id, actor): - """Remove QNRs posted beyond subject's withdrawal date""" - from sqlalchemy.types import DateTime - from portal.cache import cache - from portal.models.questionnaire_bank import trigger_date - - rs_id = 0 # only base study till need arises - acting_user = get_actor(actor, require_admin=True) - - for subject_id in subject_id: - # Confirm user has withdrawn - subject = get_target(id=subject_id) - study_id = subject.external_study_id - - # Make sure we're not working w/ stale timeline data - QuestionnaireResponse.purge_qb_relationship( - subject_id=subject_id, - research_study_id=rs_id, - acting_user_id=acting_user.id) - cache.delete_memoized(trigger_date) - update_users_QBT( - subject_id, - research_study_id=rs_id, - invalidate_existing=True) - - deceased_date = None if not subject.deceased else subject.deceased.timestamp - withdrawn_visit = QBT.withdrawn_qbd(subject_id, rs_id) - if not withdrawn_visit: - raise ValueError("Only applicable to withdrawn users") - - # Obtain all QNRs submitted beyond withdrawal date - query = QuestionnaireResponse.query.filter( - QuestionnaireResponse.document["authored"].astext.cast(DateTime) > - withdrawn_visit.relative_start - ).filter( - QuestionnaireResponse.subject_id == subject_id).with_entities( - QuestionnaireResponse.id, - QuestionnaireResponse.questionnaire_bank_id, - QuestionnaireResponse.qb_iteration, - QuestionnaireResponse.document["questionnaire"]["reference"]. - label("instrument"), - QuestionnaireResponse.document["authored"]. - label("authored") - ).order_by(QuestionnaireResponse.document["authored"]) - - for qnr in query: - # match format in bug report for easy diff - sub_padding = " "*(11 - len(str(subject_id))) - stdy_padding = " "*(12 - len(study_id)) - out = ( - f"{sub_padding}{subject_id} | " - f"{study_id}{stdy_padding}| " - f"{withdrawn_visit.relative_start.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]} | " - f"{qnr.authored} | ") - - # do not include any belonging to the last active visit, unless - # they came in after deceased date - if ( - qnr.questionnaire_bank_id == withdrawn_visit.qb_id and - qnr.qb_iteration == withdrawn_visit.iteration and - (not deceased_date or FHIR_datetime.parse( - qnr.authored) < deceased_date)): - print(f"{out}keep") - continue - if "irondemog" in qnr.instrument: - print(f"{out}keep (indefinite)") - continue - print(f"{out}delete") - db.session.delete(QuestionnaireResponse.query.get(qnr.id)) - auditable_event( - message=( - "deleted questionnaire response submitted beyond " - "withdrawal visit as per request by PCCTC"), - context="assessment", - user_id=acting_user.id, - subject_id=subject_id) - db.session.commit() - return - - @click.option('--src_id', type=int, help="Source Patient ID (WILL BE DELETED!)") @click.option('--tgt_id', type=int, help="Target Patient ID") @click.option( diff --git a/portal/config/config.py b/portal/config/config.py index 0631e4795..d3fecfc07 100644 --- a/portal/config/config.py +++ b/portal/config/config.py @@ -1,8 +1,7 @@ """Configuration""" import os -import redis - +from portal.factories.redis import create_redis from portal.models.role import ROLE SITE_CFG = 'site.cfg' @@ -152,7 +151,7 @@ class BaseConfig(object): REDIS_URL ) - SESSION_REDIS = redis.from_url(SESSION_REDIS_URL) + SESSION_REDIS = create_redis(SESSION_REDIS_URL) UPDATE_PATIENT_TASK_BATCH_SIZE = int( os.environ.get('UPDATE_PATIENT_TASK_BATCH_SIZE', 16) diff --git a/portal/config/eproms/Questionnaire.json b/portal/config/eproms/Questionnaire.json index 9a1cc079d..2ace521d2 100644 --- a/portal/config/eproms/Questionnaire.json +++ b/portal/config/eproms/Questionnaire.json @@ -5851,6 +5851,42 @@ "display": "Other", "code": "irondemog_v3.26.8" } + }, + { + "valueCoding": { + "display": "African", + "code": "irondemog_v3.26.9" + } + }, + { + "valueCoding": { + "display": "Black", + "code": "irondemog_v3.26.10" + } + }, + { + "valueCoding": { + "display": "Coloured", + "code": "irondemog_v3.26.11" + } + }, + { + "valueCoding": { + "display": "Indian", + "code": "irondemog_v3.26.12" + } + }, + { + "valueCoding": { + "display": "White / Caucasian", + "code": "irondemog_v3.26.13" + } + }, + { + "valueCoding": { + "display": "Other", + "code": "irondemog_v3.26.14" + } } ] }, diff --git a/portal/factories/redis.py b/portal/factories/redis.py new file mode 100644 index 000000000..d5debfb57 --- /dev/null +++ b/portal/factories/redis.py @@ -0,0 +1,4 @@ +import redis + +def create_redis(url): + return redis.Redis.from_url(url) diff --git a/portal/migrations/versions/3c871e710277_.py b/portal/migrations/versions/3c871e710277_.py index 77d0691d6..85aed5bdf 100644 --- a/portal/migrations/versions/3c871e710277_.py +++ b/portal/migrations/versions/3c871e710277_.py @@ -7,11 +7,12 @@ """ from alembic import op from sqlalchemy.orm import sessionmaker -from sqlalchemy.sql.functions import count +from sqlalchemy.sql.functions import func from portal.cache import cache +from portal.models.adherence_data import AdherenceData from portal.models.research_study import BASE_RS_ID, EMPRO_RS_ID -from portal.models.qb_timeline import update_users_QBT +from portal.models.qb_timeline import QBT, update_users_QBT from portal.models.questionnaire_bank import trigger_date from portal.models.questionnaire_response import ( QuestionnaireResponse, @@ -129,21 +130,25 @@ def upgrade(): session = Session(bind=bind) for study_id in (BASE_RS_ID, EMPRO_RS_ID): - subquery = session.query(UserConsent.user_id).distinct().filter( + # due to changes in adherence report for withdrawn users + # this query is now simply any withdrawn patient who isn't + # deleted from the system. + subquery = session.query(User.id).filter( + User.deleted_id.is_(None)).subquery() + query = session.query(UserConsent.user_id.distinct()).filter( UserConsent.research_study_id == study_id).filter( - UserConsent.status == 'suspended').subquery() - query = session.query( - count(UserConsent.user_id), UserConsent.user_id).filter( - UserConsent.research_study_id == study_id).filter( - UserConsent.user_id.in_(subquery)).group_by( - UserConsent.user_id).having(count(UserConsent.user_id) > 2) - for num, patient_id in query: + UserConsent.status == "suspended").filter( + UserConsent.user_id.in_(subquery)) + + delay_timeline_updates_till_after_migration = True + slow_report_details = False + delete_adh_ids = [] + for row in query: + patient_id = row[0] if patient_id in (719, 1186, 1305): # special cases best left alone continue user = User.query.get(patient_id) - if user.deleted: - continue consent_date, withdrawal_date = consent_withdrawal_dates( user, study_id) if withdrawal_date is None: @@ -152,49 +157,68 @@ def upgrade(): # no change needed in this situation continue - # report if dates don't match spreadsheet in IRONN-210 - cd_str = '{dt.day}-{dt:%b}-{dt:%y}'.format(dt=consent_date) - wd_str = '{dt.day}-{dt:%b}-{dt:%y}'.format(dt=withdrawal_date) - try: - match = verified_user_consent_dates[study_id][patient_id] - if (cd_str, wd_str) != match: - print(f"user_id {patient_id} \t {cd_str} \t {wd_str}") - print(" vs expected:") - print(f"\t\t {match[0]} \t {match[1]}") - except KeyError: - # user found to not see timeline change - pass - - # fake an adherence cache run to avoid unnecessary and more - # important, to prevent from locking out a subsequent update - # needed after recognizing a real change below - adherence_cache_moderation = CacheModeration(key=ADHERENCE_DATA_KEY.format( - patient_id=patient_id, - research_study_id=study_id)) - adherence_cache_moderation.run_now() - - b4_state = capture_patient_state(patient_id) - update_users_QBT( - patient_id, - research_study_id=study_id, - invalidate_existing=True) - _, _, _, any_changes = present_before_after_state( - patient_id, study_id, b4_state) - if not any_changes: - continue + if slow_report_details: + # report if dates don't match spreadsheet in IRONN-210 + cd_str = '{dt.day}-{dt:%b}-{dt:%y}'.format(dt=consent_date) + wd_str = '{dt.day}-{dt:%b}-{dt:%y}'.format(dt=withdrawal_date) + try: + match = verified_user_consent_dates[study_id][patient_id] + if (cd_str, wd_str) != match: + print(f"user_id {patient_id} \t {cd_str} \t {wd_str}") + print(" vs expected:") + print(f"\t\t {match[0]} \t {match[1]}") + except KeyError: + # user found to not see timeline change + pass + + # fake an adherence cache run to avoid unnecessary and more + # important, to prevent from locking out a subsequent update + # needed after recognizing a real change below + adherence_cache_moderation = CacheModeration(key=ADHERENCE_DATA_KEY.format( + patient_id=patient_id, + research_study_id=study_id)) + adherence_cache_moderation.run_now() + + b4_state = capture_patient_state(patient_id) + update_users_QBT( + patient_id, + research_study_id=study_id, + invalidate_existing=True) + _, _, _, any_changes = present_before_after_state( + patient_id, study_id, b4_state) + if not any_changes: + continue + + print(f"{patient_id} changed, purge old adherence data and relationships") + adherence_cache_moderation.reset() - print(f"{patient_id} changed, purge old adherence data and relationships") - adherence_cache_moderation.reset() QuestionnaireResponse.purge_qb_relationship( subject_id=patient_id, research_study_id=study_id, acting_user_id=patient_id) cache.delete_memoized(trigger_date) - update_users_QBT( - patient_id, - research_study_id=study_id, - invalidate_existing=True) + if delay_timeline_updates_till_after_migration: + session.query(QBT).filter(QBT.user_id == patient_id).filter( + QBT.research_study_id == study_id).delete() + adh_ids = session.query(AdherenceData.id).filter( + AdherenceData.patient_id == patient_id).filter( + AdherenceData.rs_id_visit.like(f"{study_id}:%") + ) + for ad_id in adh_ids: + delete_adh_ids.append(ad_id) + else: + update_users_QBT( + patient_id, + research_study_id=study_id, + invalidate_existing=True) + + # SQL alchemy can't combine `like` expression with delete op. + for ad_id in delete_adh_ids: + # yes this should be possible in a single stmt, + # not a loop, but no dice + session.query(AdherenceData).filter( + AdherenceData.id == ad_id).delete() def downgrade(): """no downgrade available""" diff --git a/portal/migrations/versions/66368e673005_.py b/portal/migrations/versions/66368e673005_.py index fb788ac1b..cd021f96a 100644 --- a/portal/migrations/versions/66368e673005_.py +++ b/portal/migrations/versions/66368e673005_.py @@ -7,9 +7,11 @@ """ from alembic import op from datetime import datetime -import sqlalchemy as sa from sqlalchemy.orm import sessionmaker +from portal.models.user import User +from portal.models.user_consent import consent_withdrawal_dates + # revision identifiers, used by Alembic. revision = '66368e673005' @@ -49,6 +51,14 @@ def upgrade(): if status and status[0] != "Not Yet Available": continue + # if the patient is withdrawn, skip over, will get picked + # up in migration 3c871e710277, going out in same release + patient = User.query.get(patient_id) + _, withdrawal_date = consent_withdrawal_dates( + patient, 1) + if withdrawal_date: + continue + # purge the user's EMPRO adherence rows to force refresh session.execute( "DELETE FROM adherence_data WHERE" diff --git a/portal/models/adherence_data.py b/portal/models/adherence_data.py index a633d2770..f70473656 100644 --- a/portal/models/adherence_data.py +++ b/portal/models/adherence_data.py @@ -2,9 +2,10 @@ from datetime import datetime, timedelta from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy import UniqueConstraint +import re from ..database import db - +withdrawn = " post-withdrawn" class AdherenceData(db.Model): """ Cached adherence report data @@ -35,16 +36,20 @@ class AdherenceData(db.Model): 'patient_id', 'rs_id_visit', name='_adherence_unique_patient_visit'),) @staticmethod - def rs_visit_string(rs_id, visit_string): + def rs_visit_string(rs_id, visit_string, post_withdrawn=False): """trivial helper to build rs_id_visit string into desired format""" assert isinstance(rs_id, int) assert visit_string + if post_withdrawn: + visit_string += withdrawn return f"{rs_id}:{visit_string}" def rs_visit_parse(self): """break parts of rs_id and visit_string out of rs_id_visit field""" rs_id, visit_string = self.rs_id_visit.split(':') assert visit_string + if visit_string.endswith(withdrawn): + visit_string = visit_string[:-(len(withdrawn))] return int(rs_id), visit_string @staticmethod @@ -92,15 +97,24 @@ def sort_by_visit_key(d): :returns: list of values sorted by keys """ + pattern = re.compile(f"Month ([0-9]+)({withdrawn})?") def sort_key(key): if key == 'Baseline': return 0, 0 + elif key == f"Baseline{withdrawn}": + return 0, 1 elif key == 'Indefinite': return 2, 0 + elif key == f'Indefinite{withdrawn}': + return 2, 1 else: - month, num = key.split(" ") - assert month == "Month" - return 1, int(num) + match = pattern.match(key) + if not match.groups(): + raise ValueError(f"couldn't parse key {key}") + month_num = int(match.groups()[0]) + if match.groups()[1]: + month_num += 100 + return 1, month_num sorted_keys = sorted(d.keys(), key=sort_key) sorted_values = [d[key] for key in sorted_keys] diff --git a/portal/models/qb_status.py b/portal/models/qb_status.py index 83711bb62..26422a0ea 100644 --- a/portal/models/qb_status.py +++ b/portal/models/qb_status.py @@ -77,6 +77,11 @@ def _sync_timeline(self): # locate current qb - last found with start <= self.as_of_date cur_index, cur_qbd = None, None for i, qbd in zip(range(len(self.__ordered_qbs)), self.__ordered_qbs): + if self._withdrawal_date and ( + qbd.relative_start > self._withdrawal_date): + # as we now keep timeline data beyond withdrawal, break + # out if the requested date is beyond withdrawal + break if qbd.relative_start <= self.as_of_date: cur_index = i cur_qbd = qbd diff --git a/portal/models/qb_timeline.py b/portal/models/qb_timeline.py index b1d110070..aee9822af 100644 --- a/portal/models/qb_timeline.py +++ b/portal/models/qb_timeline.py @@ -4,7 +4,6 @@ from dateutil.relativedelta import relativedelta from flask import current_app -import redis from redis.exceptions import ConnectionError from sqlalchemy.types import Enum as SQLA_Enum from werkzeug.exceptions import BadRequest @@ -13,13 +12,9 @@ from ..cache import cache, TWO_HOURS from ..database import db from ..date_tools import FHIR_datetime, RelativeDelta -from ..factories.celery import create_celery +from ..factories.redis import create_redis from ..set_tools import left_center_right -from ..timeout_lock import ( - ADHERENCE_DATA_KEY, - CacheModeration, - TimeoutLock, -) +from ..timeout_lock import ADHERENCE_DATA_KEY, CacheModeration, TimeoutLock from ..trace import trace from .adherence_data import AdherenceData from .overall_status import OverallStatus @@ -107,25 +102,6 @@ def timeline_state(user_id): vn, name_map[i.qb_id], i.qb_iteration] return results - @staticmethod - def withdrawn_qbd(user_id, research_study_id): - """Returns active QBD at time of user's withdrawal if applicable - - :returns: a QBD representing the visit active at point of withdrawal - from given study, using `relative_start` to hold date-time of - withdrawal; or None if n/a - """ - qbt = QBT.query.filter(QBT.user_id == user_id).filter( - QBT.research_study_id == research_study_id).filter( - QBT.status == OverallStatus.withdrawn).first() - if not qbt: - return None - return QBD( - relative_start=qbt.at, - iteration=qbt.qb_iteration, - recur_id=qbt.qb_recur_id, - qb_id=qbt.qb_id) - class AtOrderedList(list): """Specialize ``list`` to maintain insertion order and ``at`` attribute @@ -291,6 +267,8 @@ def calc_and_adjust_start(user, research_study_id, qbd, initial_trigger): return qbd.relative_start delta = users_trigger - initial_trigger + # this case should no longer be possible; raise the alarm + raise RuntimeError("found initial trigger to differ by: %s", str(delta)) current_app.logger.debug("calc_and_adjust_start delta: %s", str(delta)) return qbd.relative_start + delta @@ -606,7 +584,7 @@ def ordered_qbs(user, research_study_id, classification=None): This does NOT include the indefinite classification unless requested, as it plays by a different set of rules. - :param user: the user to lookup + :param user: the user to look up :param research_study_id: the research study being processed :param classification: set to ``indefinite`` for that special handling :returns: QBD for each (QB, iteration, recur) @@ -725,11 +703,6 @@ def ordered_qbs(user, research_study_id, classification=None): if transition_now: rp_flyweight.transition() - # done if user withdrew before QB starts - if withdrawal_date and withdrawal_date < rp_flyweight.cur_start: - trace("withdrawn as of {}".format(withdrawal_date)) - break - rp_flyweight.adjust_start() yield rp_flyweight.cur_qbd @@ -788,6 +761,17 @@ def invalidate_users_QBT(user_id, research_study_id): for ad in adh_data: db.session.delete(ad) + if not current_app.config.get("TESTING", False): + # clear the timeout lock as well, since we need a refresh + # after deletion of the adherence data + # otherwise, we experience a deadlock situation where tables can't be dropped + # between test runs, as postgres believes a deadlock condition exists + cache_moderation = CacheModeration(key=ADHERENCE_DATA_KEY.format( + patient_id=user_id, + research_study_id=research_study_id)) + cache_moderation.reset() + + # args have to match order and values - no wild carding avail as_of = QB_StatusCacheKey().current() if research_study_id != 'all': @@ -888,6 +872,7 @@ def update_users_QBT(user_id, research_study_id, invalidate_existing=False): def attempt_update(user_id, research_study_id, invalidate_existing): """Updates user's QBT or raises if lock is unattainable""" from .qb_status import patient_research_study_status + from ..tasks import LOW_PRIORITY, cache_single_patient_adherence_data # acquire a multiprocessing lock to prevent multiple requests # from duplicating rows during this slow process @@ -935,7 +920,7 @@ def attempt_update(user_id, research_study_id, invalidate_existing): trace(f"user determined ineligible for {research_study_id}") return - # Create time line for user, from initial trigger date + # Create time-line for user, from initial trigger date qb_generator = ordered_qbs(user, research_study_id) user_qnrs = QNR_results(user, research_study_id) @@ -981,7 +966,7 @@ def attempt_update(user_id, research_study_id, invalidate_existing): # QBs - one needing to be removed (say the old # month 36) in favor of the skipped new (say # month 33), and the last legit old one (say - # month 30) needing it's endpoint adjusted + # month 30) needing its endpoint adjusted # further below. remove_qb_id = pending_qbts[i].qb_id remove_iteration = pending_qbts[i].qb_iteration @@ -1056,7 +1041,7 @@ def attempt_update(user_id, research_study_id, invalidate_existing): "Problematic qbd: %s", user_id, str(qbd)) continue - # Must double check overlap; may no longer be true, if + # Must double-check overlap; may no longer be true, if # last_posted_index was one before... if pending_qbts[last_posted_index].at > start: # For questionnaires with common instrument names that @@ -1172,25 +1157,27 @@ def attempt_update(user_id, research_study_id, invalidate_existing): pending_qbts.append(QBT( at=expired_date, status='expired', **kwargs)) - # If user withdrew from study - remove any rows post withdrawal + # If user withdrew from study, add a row marking the withdrawal + # to the user's timeline, at the proper sequence. num_stored = 0 _, withdrawal_date = consent_withdrawal_dates( user, research_study_id=research_study_id) if withdrawal_date: trace("withdrawn as of {}".format(withdrawal_date)) - store_rows = [ - qbt for qbt in pending_qbts if qbt.at < withdrawal_date] - if store_rows: - # To satisfy the `Withdrawn sanity check` in qb_status - # the withdrawn row needs to match the last valid qb - kwargs['qb_id'] = store_rows[-1].qb_id - kwargs['qb_iteration'] = store_rows[-1].qb_iteration - kwargs['qb_recur_id'] = store_rows[-1].qb_recur_id - - store_rows.append(QBT( - at=withdrawal_date, - status='withdrawn', - **kwargs)) + j = 0 + for qbt in pending_qbts: + if qbt.at > withdrawal_date: + break + j += 1 + if j > 0: + # include visit in withdrawn for qb_status functionality + kwargs['qb_id'] = pending_qbts[j-1].qb_id + kwargs['qb_iteration'] = pending_qbts[j-1].qb_iteration + kwargs['qb_recur_id'] = pending_qbts[j-1].qb_recur_id + store_rows = ( + pending_qbts[0:j] + + [QBT(at=withdrawal_date, status='withdrawn', **kwargs)] + + pending_qbts[j:]) check_for_overlaps(store_rows) db.session.add_all(store_rows) num_stored = len(store_rows) @@ -1206,19 +1193,12 @@ def attempt_update(user_id, research_study_id, invalidate_existing): db.session.commit() # With fresh calculation of a user's timeline, queue update of - # user's adherence data as celery job, avoiding recursive issues - # if this call happens to be part of an already running update - cache_moderation = CacheModeration(key=ADHERENCE_DATA_KEY.format( - patient_id=user_id, - research_study_id=research_study_id)) - if not cache_moderation.run_recently(): - kwargs = { - 'patient_id': user_id, - 'research_study_id': research_study_id} - celery = create_celery(current_app) - celery.send_task( - 'portal.tasks.cache_adherence_data_task', - kwargs=kwargs) + # user's adherence data as celery job + kwargs = { + 'patient_id': user_id, + 'research_study_id': research_study_id} + cache_single_patient_adherence_data.apply_async( + kwargs=kwargs, queue=LOW_PRIORITY, retry=False) success = False for attempt in range(1, 6): @@ -1264,8 +1244,7 @@ def __init__(self): # Lookup the configured expiration of the matching cache # container ("DOGPILE_CACHE_REGIONS" -> "assessment_cache_region") if self.redis is None: - self.redis = redis.StrictRedis.from_url( - current_app.config['REDIS_URL']) + self.redis = create_redis(current_app.config['REDIS_URL']) regions = current_app.config['DOGPILE_CACHE_REGIONS'] for region_name, duration in regions: if region_name == self.region_name: diff --git a/portal/models/reporting.py b/portal/models/reporting.py index 6b10c976f..6292f383e 100644 --- a/portal/models/reporting.py +++ b/portal/models/reporting.py @@ -35,7 +35,7 @@ from .user_consent import consent_withdrawal_dates -def single_patient_adherence_data(patient, as_of_date, research_study_id): +def single_patient_adherence_data(patient_id, research_study_id): """Update any missing (from cache) adherence data for patient NB: all changes are side effects, persisted in adherence_data table. @@ -48,8 +48,9 @@ def single_patient_adherence_data(patient, as_of_date, research_study_id): :returns: number of added rows """ + as_of_date = datetime.utcnow() cache_moderation = CacheModeration(key=ADHERENCE_DATA_KEY.format( - patient_id=patient.id, + patient_id=patient_id, research_study_id=research_study_id)) if cache_moderation.run_recently(): return 0 @@ -86,28 +87,22 @@ def patient_data(patient): def general_row_detail(row, patient, qbd): """Add general (either study) data for given (patient, qbd)""" # purge values that may have previous row data set and aren't certain - for key in "completion_date", "oow_completion_date", "entry_method": + for key in "oow_completion_date", "entry_method", "visit": row.pop(key, None) row['qb'] = qbd.questionnaire_bank.name - row['visit'] = visit_name(qbd) - # Withdrawn users that happened to have completed their last QB - # are treated as "Completed" - withdrawn_and_completed = ( - row['status'] == 'Withdrawn' and qbd.completed_date(patient.id)) - if row['status'] == 'Completed' or withdrawn_and_completed: - row['completion_date'] = report_format( - qbd.completed_date(patient.id)) or "" - row['oow_completion_date'] = report_format( - qbd.oow_completed_date(patient.id)) or "" - row['status'] = 'Completed' + + # if withdrawn, include a row with that and little more if row['status'] == 'Withdrawn': - # visit unreliable when withdrawn - clear - row['visit'] = '' # use date of withdrawal for "completion date" _, withdrawal_date = consent_withdrawal_dates( user=patient, research_study_id=research_study_id) row['completion_date'] = report_format(withdrawal_date) + return + row['visit'] = visit_name(qbd) + row['completion_date'] = ( + report_format(qbd.completed_date(patient.id)) + if row['status'] == 'Completed' else '') entry_method = QNR_results( patient, research_study_id=research_study_id, @@ -118,6 +113,9 @@ def general_row_detail(row, patient, qbd): def empro_row_detail(row, ts_reporting): """Add EMPRO specifics""" + if not ts_reporting: + return + # Rename column header for EMPRO if 'completion_date' in row: row['EMPRO_questionnaire_completion_date'] = ( @@ -144,6 +142,7 @@ def empro_row_detail(row, ts_reporting): row['content_domains_accessed'] = ', '.join(da) if da else "" added_rows = 0 + patient = User.query.get(patient_id) qb_stats = QB_Status( user=patient, research_study_id=research_study_id, @@ -176,12 +175,12 @@ def empro_row_detail(row, ts_reporting): if not exp_row or exp_row.at > as_of_date: row["status"] = "Not Yet Available" + ts_reporting = ( + TriggerStatesReporting(patient_id=patient.id) + if research_study_id == EMPRO_RS_ID else None) if last_viable: general_row_detail(row, patient, last_viable) - if research_study_id == EMPRO_RS_ID: - # Initialize trigger states reporting for patient - ts_reporting = TriggerStatesReporting(patient_id=patient.id) - empro_row_detail(row, ts_reporting) + empro_row_detail(row, ts_reporting) # latest is only valid for a day, unless the user withdrew valid_for = 30 if row['status'] in ('Expired', 'Withdrawn') else 1 @@ -192,6 +191,43 @@ def empro_row_detail(row, ts_reporting): data=row) added_rows += 1 + # if the last row was withdrawn, add any completed visits beyond + # date of withdrawal + if row["status"] == 'Withdrawn': + withdrawal_date = ( + row['completion_date'] if 'completion_date' in row + else row['EMPRO_questionnaire_completion_date']) + missing_qbts = [] + completed_after_withdrawn = QBT.query.filter( + QBT.at > withdrawal_date).filter( + QBT.status == OverallStatus.completed).filter( + QBT.research_study_id == research_study_id).filter( + QBT.user_id == patient.id).order_by(QBT.at) + for qbt in completed_after_withdrawn: + missing_qbts.append((qbt.at, qbt.qbd())) + + # one more special case! the withdrawn visit was completed + # but BEFORE the user withdrew. the qb_status accurately sees + # the visit as withdrawn, and wrote that to the last row, but + # failed to write out the completed status first. + pre_wd_visit_cd = last_viable.completed_date(patient.id) + if pre_wd_visit_cd and not [ + x for x, y in missing_qbts if x == pre_wd_visit_cd]: + missing_qbts.append((pre_wd_visit_cd, last_viable)) + + for at, qbd in missing_qbts: + row['status'] = 'Completed' # overwrite withdrawn state + general_row_detail(row, patient, qbd) + empro_row_detail(row, ts_reporting) + rs_visit = AdherenceData.rs_visit_string( + research_study_id, row['visit'], post_withdrawn=True) + AdherenceData.persist( + patient_id=patient.id, + rs_id_visit=rs_visit, + valid_for_days=30, + data=row) + added_rows += 1 + # as we require a full history, continue to add rows for each previous for qbd, status in qb_stats.older_qbds(last_viable): rs_visit = AdherenceData.rs_visit_string( @@ -204,9 +240,7 @@ def empro_row_detail(row, ts_reporting): historic = row.copy() historic['status'] = status general_row_detail(historic, patient, qbd) - - if research_study_id == EMPRO_RS_ID: - empro_row_detail(historic, ts_reporting) + empro_row_detail(historic, ts_reporting) AdherenceData.persist( patient_id=patient.id, rs_id_visit=rs_visit, @@ -276,6 +310,7 @@ def cache_adherence_data( if limit was hit """ + from ..tasks import cache_single_patient_adherence_data # For building cache, use system account; skip privilege checks acting_user = User.query.filter_by(email='__system__').one() as_of_date = datetime.utcnow() @@ -304,11 +339,16 @@ def patient_generator(): added_rows = 0 for patient in patient_generator(): - if added_rows > limit: + if limit and added_rows > limit: current_app.logger.info( "pre-mature exit caching adherence data having hit limit") break - single_patient_adherence_data(patient, as_of_date, research_study_id) + # queue patient's adherence cache refresh as a separate job + kwargs = { + 'patient_id': patient.id, + 'research_study_id': research_study_id} + cache_single_patient_adherence_data.apply_async( + kwargs=kwargs, retry=False) return {'added': added_rows, 'limit_hit': limit and added_rows > limit} diff --git a/portal/tasks.py b/portal/tasks.py index f53a9b20d..be2317f5b 100644 --- a/portal/tasks.py +++ b/portal/tasks.py @@ -14,13 +14,13 @@ from celery.utils.log import get_task_logger from flask import current_app -import redis from requests import Request, Session from requests.exceptions import RequestException from sqlalchemy import and_ from .database import db from .factories.app import create_app +from .factories.redis import create_redis from .factories.celery import create_celery from .models.communication import Communication from .models.communication_request import queue_outstanding_messages @@ -32,6 +32,7 @@ cache_adherence_data, generate_and_send_summaries, research_report, + single_patient_adherence_data, ) from .models.research_study import ResearchStudy from .models.role import ROLE, Role @@ -113,9 +114,16 @@ def info(): queue=LOW_PRIORITY) @scheduled_task def cache_adherence_data_task(**kwargs): + """Queues up all patients needing a cache refresh""" return cache_adherence_data(**kwargs) +@celery.task(queue=LOW_PRIORITY, ignore_results=True) +def cache_single_patient_adherence_data(**kwargs): + """Populates adherence data for a single patient""" + return single_patient_adherence_data(**kwargs) + + @celery.task(bind=True, track_started=True, queue=LOW_PRIORITY) def adherence_report_task(self, **kwargs): current_app.logger.debug("launch adherence report task: %s", self.request.id) @@ -393,7 +401,7 @@ def token_watchdog(**kwargs): def celery_beat_health_check(**kwargs): """Refreshes self-expiring redis value for /healthcheck of celerybeat""" - rs = redis.StrictRedis.from_url(current_app.config['REDIS_URL']) + rs = create_redis(current_app.config['REDIS_URL']) return rs.setex( name='last_celery_beat_ping', time=current_app.config['LAST_CELERY_BEAT_PING_EXPIRATION_TIME'], @@ -406,7 +414,7 @@ def celery_beat_health_check(**kwargs): def celery_beat_health_check_low_priority_queue(**kwargs): """Refreshes self-expiring redis value for /healthcheck of celerybeat""" - rs = redis.StrictRedis.from_url(current_app.config['REDIS_URL']) + rs = create_redis(current_app.config['REDIS_URL']) return rs.setex( name='last_celery_beat_ping_low_priority_queue', time=10*current_app.config['LAST_CELERY_BEAT_PING_EXPIRATION_TIME'], diff --git a/portal/timeout_lock.py b/portal/timeout_lock.py index b33782d88..21ab8af39 100644 --- a/portal/timeout_lock.py +++ b/portal/timeout_lock.py @@ -1,8 +1,8 @@ import time from flask import current_app -import redis +from .factories.redis import create_redis class LockTimeout(BaseException): """Exception raised when wait for TimeoutLock exceeds timeout""" @@ -31,8 +31,7 @@ def __init__(self, key, expires=60, timeout=10): self.key = key self.timeout = timeout self.expires = expires - self.redis = redis.StrictRedis.from_url( - current_app.config['REDIS_URL']) + self.redis = create_redis(current_app.config['REDIS_URL']) def __enter__(self): timeout = self.timeout @@ -105,8 +104,7 @@ class CacheModeration(object): def __init__(self, key, timeout=300): self.key = key self.timeout = timeout - self.redis = redis.StrictRedis.from_url( - current_app.config['REDIS_URL']) + self.redis = create_redis(current_app.config['REDIS_URL']) def run_recently(self): """if key has value in redis (i.e. didn't expire) return value""" diff --git a/portal/views/healthcheck.py b/portal/views/healthcheck.py index af96c5db3..be84061a8 100644 --- a/portal/views/healthcheck.py +++ b/portal/views/healthcheck.py @@ -3,12 +3,12 @@ from celery.exceptions import TimeoutError from celery.result import AsyncResult from flask import Blueprint, current_app -import redis from redis.exceptions import ConnectionError from sqlalchemy import text from ..database import db from ..factories.celery import create_celery +from ..factories.redis import create_redis HEALTHCHECK_FAILURE_STATUS_CODE = 200 @@ -23,7 +23,7 @@ def celery_beat_ping(): This allows us to monitor whether celery beat tasks are running """ try: - rs = redis.StrictRedis.from_url(current_app.config['REDIS_URL']) + rs = create_redis(current_app.config['REDIS_URL']) rs.setex( name='last_celery_beat_ping', time=current_app.config['LAST_CELERY_BEAT_PING_EXPIRATION_TIME'], @@ -64,7 +64,7 @@ def celery_available(): def celery_beat_available(): """Determines whether celery beat is available""" try: - rs = redis.from_url(current_app.config['REDIS_URL']) + rs = create_redis(current_app.config['REDIS_URL']) # Celery beat feeds scheduled jobs (a la cron) to the respective # job queues (standard and low priority). As a monitor, a job @@ -109,7 +109,7 @@ def redis_available(): # is available. Otherwise we assume # it's not available try: - rs = redis.from_url(current_app.config["REDIS_URL"]) + rs = create_redis(current_app.config["REDIS_URL"]) rs.ping() return True, 'Redis is available.' except Exception as e: diff --git a/portal/views/patient.py b/portal/views/patient.py index 8ec632baf..7ef1e9c16 100644 --- a/portal/views/patient.py +++ b/portal/views/patient.py @@ -30,9 +30,7 @@ from ..models.questionnaire_bank import QuestionnaireBank, trigger_date from ..models.questionnaire_response import QuestionnaireResponse from ..models.reference import Reference -from ..models.reporting import single_patient_adherence_data from ..models.research_study import ( - EMPRO_RS_ID, ResearchStudy, research_study_id_from_questionnaire ) @@ -332,6 +330,7 @@ def patient_timeline(patient_id): from ..models.questionnaire_bank import visit_name from ..models.questionnaire_response import aggregate_responses from ..models.research_protocol import ResearchProtocol + from ..tasks import cache_single_patient_adherence_data from ..trace import dump_trace, establish_trace user = get_user(patient_id, permission='view') @@ -467,9 +466,11 @@ def get_recur_id(qnr): if not adherence_data: # immediately following a cache purge, adherence data is gone and # needs to be recreated. - now = datetime.utcnow() - single_patient_adherence_data( - user, as_of_date=now, research_study_id=EMPRO_RS_ID) + kwargs = { + "patient_id": user.id, + "research_study_id": research_study_id, + } + cache_single_patient_adherence_data(**kwargs) adherence_data = sorted_adherence_data(patient_id, research_study_id) qnr_responses = aggregate_responses( diff --git a/tests/test_healthcheck.py b/tests/test_healthcheck.py index 7ace20aa4..66bc368c6 100644 --- a/tests/test_healthcheck.py +++ b/tests/test_healthcheck.py @@ -29,21 +29,21 @@ def test_celery_available_fails_when_celery_ping_fails( results = celery_available() assert results[0] is False - @patch('portal.views.healthcheck.redis') + @patch('portal.views.healthcheck.create_redis') def test_celery_beat_available_fails_when_redis_var_none( self, - redis_mock + create_redis_mock ): - redis_mock.from_url.return_value.get.return_value = None + create_redis_mock.return_value.get.return_value = None results = celery_beat_available() assert results[0] is False - @patch('portal.views.healthcheck.redis') + @patch('portal.views.healthcheck.create_redis') def test_celery_beat_available_succeeds_when_redis_var_set( self, - redis_mock + create_redis_mock ): - redis_mock.from_url.return_value.get.return_value = \ + create_redis_mock.return_value.get.return_value = \ str(datetime.now()) results = celery_beat_available() assert results[0] is True @@ -68,21 +68,21 @@ def test_postgresql_available_fails_when_query_exception( results = postgresql_available() assert results[0] is False - @patch('portal.views.healthcheck.redis') + @patch('portal.views.healthcheck.create_redis') def test_redis_available_succeeds_when_ping_successful( self, - redis_mock + create_redis_mock ): - redis_mock.from_url.return_value.ping.return_value = True + create_redis_mock.return_value.ping.return_value = True results = redis_available() assert results[0] is True - @patch('portal.views.healthcheck.redis') + @patch('portal.views.healthcheck.create_redis') def test_redis_available_fails_when_ping_throws_exception( self, - redis_mock + create_redis_mock ): - redis_mock.from_url.return_value.ping.side_effect = \ + create_redis_mock.return_value.ping.side_effect = \ redis.ConnectionError() results = redis_available() assert results[0] is False diff --git a/tests/test_intervention.py b/tests/test_intervention.py index 69efd1070..a29fd3214 100644 --- a/tests/test_intervention.py +++ b/tests/test_intervention.py @@ -555,6 +555,7 @@ def test_in_role(initialize_static, test_user): assert sm.quick_access_check(user) +@pytest.mark.skip("no longer supporting moving initial trigger dates") def test_card_html_update( client, initialize_static, initialized_patient_logged_in): """Confirm assessment status state affects AE card on /home view""" diff --git a/tests/test_qb_timeline.py b/tests/test_qb_timeline.py index 19e0b9293..2aba026f5 100644 --- a/tests/test_qb_timeline.py +++ b/tests/test_qb_timeline.py @@ -309,7 +309,7 @@ def test_qb_post_consent_change(self): assert qbstatus.overall_status == OverallStatus.completed def test_withdrawn(self): - # qbs should halt beyond withdrawal + # check qb_status post withdrawal crv = self.setup_org_qbs() crv_id = crv.id # consent 17 months in past @@ -334,8 +334,9 @@ def test_withdrawn(self): for n in (3, 6, 9, 15): assert visit_name(next(gen)) == 'Month {}'.format(n) - with pytest.raises(StopIteration): - next(gen) + # current should be withdrawn, subsequent avail in case + # post withdrawn results come in + assert visit_name(next(gen)) == 'Month 18' # Confirm withdrawn user can still access "current" # as needed for reporting @@ -343,9 +344,10 @@ def test_withdrawn(self): qb_stats = QB_Status( user=user, research_study_id=0, - as_of_date=now) + as_of_date=now+relativedelta(days=1)) current = qb_stats.current_qbd(even_if_withdrawn=True) assert current + assert qb_stats.overall_status == OverallStatus.withdrawn def test_change_midstream_rp(self): back7, nowish = associative_backdate( diff --git a/tests/test_reporting.py b/tests/test_reporting.py index a81c39ff5..0d1675272 100644 --- a/tests/test_reporting.py +++ b/tests/test_reporting.py @@ -3,6 +3,7 @@ from datetime import datetime from dateutil.relativedelta import relativedelta from flask_webtest import SessionScope +from time import sleep from portal.cache import cache from portal.extensions import db @@ -150,16 +151,40 @@ def test_adherence_sort(self): "completion_date ": "19 - Jun - 2023 07: 42:46 ", "oow_completion_date": "" }, + "Month 12 post-withdrawn": { + "qb": "CRV Baseline v2", + "site": "CRV", + "visit": "Month 12", + "status": "Completed", + "consent": "20 - May - 2023 07: 42:46 ", + "completion_date ": "25 - Jun - 2023 00:00:00 ", + "country ": None, + "user_id ": 3, + "study_id": "study user 3", + "site_code": ""}, "Month 12": { "qb": "CRV Baseline v2", "site": "CRV", "visit": "Month 12", - "status": "Overdue", + "status": "Withdrawn", + "completion_date ": "22 - Jun - 2023 00:00:00 ", "consent": "20 - May - 2023 07: 42:46 ", "country ": None, "user_id ": 3, "study_id": "study user 3", "site_code": ""}, + "Baseline post-withdrawn": { + "qb": "CRV Baseline v2", + "site": "CRV", + "visit": "Baseline", + "status": "Completed", + "completion_date ": "22 - Jun - 2023 00:00:00 ", + "consent": "19 - Jun - 2023 07: 42:46", + "country ": None, + "user_id ": 2, + "study_id": "study user 2", + "site_code": "" + }, "Baseline": { "qb": "CRV Baseline v2", "site": "CRV", @@ -173,10 +198,17 @@ def test_adherence_sort(self): }, } results = sort_by_visit_key(sort_me) - assert len(results) == 3 + assert len(results) == 5 assert results[0]["visit"] == "Baseline" - assert results[1]["visit"] == "Month 3" - assert results[2]["visit"] == "Month 12" + assert results[0]["status"] == "Due" + assert results[1]["visit"] == "Baseline" + assert results[1]["status"] == "Completed" + assert results[2]["visit"] == "Month 3" + assert results[2]["status"] == "Completed" + assert results[3]["visit"] == "Month 12" + assert results[3]["status"] == "Withdrawn" + assert results[4]["visit"] == "Month 12" + assert results[4]["status"] == "Completed" def populate_adherence_cache(self, test_users): """helper method to bring current test user state into adherence cache""" @@ -298,6 +330,7 @@ def test_results(self): self.consent_with_org(org_id=org_id) self.login() self.populate_adherence_cache(test_users=(user2, user3, user4)) + sleep(5) # as adherence jobs run independently, give em time response = self.results_from_async_call( "/api/report/questionnaire_status", timeout=10)