From 1d68f28a516f73421d3e9605c57f38b080f86ef9 Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Wed, 6 Sep 2023 15:53:38 -0700 Subject: [PATCH 01/13] TN-3236, sequential hard trigger counts kept in trigger_states.triggers, not previous answers! --- portal/trigger_states/empro_domains.py | 17 +++++++++++------ portal/trigger_states/empro_states.py | 15 ++++++++++----- tests/test_trigger_states.py | 20 ++++++++++++++++---- 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/portal/trigger_states/empro_domains.py b/portal/trigger_states/empro_domains.py index d60b69d91..d5167c4b4 100644 --- a/portal/trigger_states/empro_domains.py +++ b/portal/trigger_states/empro_domains.py @@ -28,7 +28,7 @@ class DomainTriggers(object): """ def __init__( - self, domain, current_answers, previous_answers, initial_answers): + self, domain, current_answers, previous_answers, initial_answers, previous_triggers): self.domain = domain self._triggers = dict() @@ -38,6 +38,9 @@ def __init__( self.previous_answers = previous_answers or dict() self.initial_answers = initial_answers or dict() + # Trigger state triggers from previous month, if defined + self.previous_triggers = previous_triggers + @property def triggers(self): self.eval() @@ -92,9 +95,9 @@ def eval(self): sequential_hard_trigger_count = 1 if ( sequential_hard_trigger_count and - self.previous_answers and - sequential_hard_trigger_count_key in self.previous_answers): - sequential_hard_trigger_count = self.previous_answers[sequential_hard_trigger_count_key] + 1 + self.previous_triggers and + sequential_hard_trigger_count_key in self.previous_triggers): + sequential_hard_trigger_count = self.previous_triggers[sequential_hard_trigger_count_key] + 1 self._triggers[sequential_hard_trigger_count_key] = sequential_hard_trigger_count @@ -145,17 +148,19 @@ def obtain_observations(self, qnr): results[domain][link_id] = (int(score), severity) setattr(self, f"{timepoint}_obs", results) - def eval_triggers(self): + def eval_triggers(self, previous_triggers): triggers = dict() triggers['domain'] = dict() for domain in EMPRO_DOMAINS: if domain in self.cur_obs: + prev_triggers_for_domain = previous_triggers["domain"][domain] if previous_triggers else None dt = DomainTriggers( domain=domain, current_answers=self.cur_obs[domain], previous_answers=self.prev_obs.get(domain), - initial_answers=self.initial_obs.get(domain) + initial_answers=self.initial_obs.get(domain), + previous_triggers=prev_triggers_for_domain, ) triggers['domain'][domain] = dt.triggers diff --git a/portal/trigger_states/empro_states.py b/portal/trigger_states/empro_states.py index a2c73e52f..4ac05204f 100644 --- a/portal/trigger_states/empro_states.py +++ b/portal/trigger_states/empro_states.py @@ -210,9 +210,18 @@ def evaluate_triggers(qnr): ts = users_trigger_state(qnr.subject_id) sm = EMPRO_state(ts) + # include previous month resolved row, if available + previous = TriggerState.query.filter( + TriggerState.user_id == qnr.subject_id).filter( + TriggerState.state == 'resolved').order_by( + TriggerState.timestamp.desc()).first() + # bring together and evaluate available data for triggers dm = DomainManifold(qnr) - ts.triggers = dm.eval_triggers() + previous_triggers = ( + previous if previous and previous.visit_month + 1 == ts.visit_month + else None) + ts.triggers = dm.eval_triggers(previous_triggers) ts.questionnaire_response_id = qnr.id # transition and persist state @@ -225,10 +234,6 @@ def evaluate_triggers(qnr): # a submission closes the window of availability for the # post-intervention clinician follow up. mark state if # one is found - previous = TriggerState.query.filter( - TriggerState.user_id == qnr.subject_id).filter( - TriggerState.state == 'resolved').order_by( - TriggerState.timestamp.desc()).first() if previous and previous.triggers.get('action_state') not in ( 'completed', 'missed', 'not applicable', 'withdrawn'): triggers = copy.deepcopy(previous.triggers) diff --git a/tests/test_trigger_states.py b/tests/test_trigger_states.py index d15b060b3..de471e5b4 100644 --- a/tests/test_trigger_states.py +++ b/tests/test_trigger_states.py @@ -127,6 +127,15 @@ def test_2nd_eval( def test_cur_hard_trigger(): # Single result with a severe should generate a hard (and soft) trigger + + # include a previous triggered state to test sequential count + previous_triggers = { + "ironman_ss.11": "hard", + "ironman_ss.12": "hard", + "ironman_ss.13": "hard", + "_sequential_hard_trigger_count": 3, + } + dt = DomainTriggers( domain='anxious', current_answers={ @@ -134,10 +143,11 @@ def test_cur_hard_trigger(): 'ironman_ss.11': ('2', None), 'ironman_ss.13': ('4', 'penultimate')}, previous_answers=None, - initial_answers=None) + initial_answers=None, + previous_triggers=previous_triggers) assert len([k for k in dt.triggers.keys() if not k.startswith('_')]) == 1 assert 'ironman_ss.13' in dt.triggers - assert dt.triggers[sequential_hard_trigger_count_key] == 1 + assert dt.triggers[sequential_hard_trigger_count_key] == 4 def test_worsening_soft_trigger(): @@ -147,7 +157,8 @@ def test_worsening_soft_trigger(): previous_answers={'ss.21': (2, None), 'ss.15': (2, None)}, current_answers={ 'ss.15': (3, None), 'ss.12': (3, None), 'ss.21': (1, None)}, - initial_answers=None) + initial_answers=None, + previous_triggers=None) assert len([k for k in dt.triggers.keys() if not k.startswith('_')]) == 1 assert dt.triggers['ss.15'] == 'soft' assert dt.triggers[sequential_hard_trigger_count_key] == 0 @@ -162,7 +173,8 @@ def test_worsening_baseline(): domain='anxious', initial_answers=initial_answers, previous_answers=previous_answers, - current_answers=current_answers) + current_answers=current_answers, + previous_triggers=None) assert len([k for k in dt.triggers.keys() if not k.startswith('_')]) == 2 assert dt.triggers['12'] == dt.triggers['21'] == 'hard' From e6f4c3295ee0890eeafa46dd599190b1b21f1e96 Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Wed, 20 Sep 2023 14:49:29 -0700 Subject: [PATCH 02/13] expand migration downgrade step, to remove all sequential hard domain counts, so the test db can be switched over to a hotfix without this migration. --- portal/migrations/versions/80c3b1e96c45_.py | 31 ++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/portal/migrations/versions/80c3b1e96c45_.py b/portal/migrations/versions/80c3b1e96c45_.py index fee8ec078..304fdc509 100644 --- a/portal/migrations/versions/80c3b1e96c45_.py +++ b/portal/migrations/versions/80c3b1e96c45_.py @@ -81,4 +81,33 @@ def upgrade(): def downgrade(): - pass # no value in removing + # for each active EMPRO patient with at least 1 hard triggered domain, + # remove any sequential counts found + bind = op.get_bind() + session = Session(bind=bind) + + patient_ids = [] + for patient_id in session.execute( + "SELECT DISTINCT(user_id) FROM trigger_states JOIN users" + " ON users.id = user_id WHERE deleted_id IS NULL"): + patient_ids.append(patient_id[0]) + + output = StringIO() + for pid in patient_ids: + output.write(f"\n\nPatient: {pid}\n") + trigger_states = db.session.query(TriggerState).filter( + TriggerState.user_id == pid).filter( + TriggerState.state == "resolved").order_by( + TriggerState.timestamp.asc()) + for ts in trigger_states: + improved_triggers = deepcopy(ts.triggers) + for d in EMPRO_DOMAINS: + if sequential_hard_trigger_count_key in improved_triggers["domain"][d]: + del improved_triggers["domain"][d][sequential_hard_trigger_count_key] + output.write(f" removed sequential from {ts.visit_month}:{d} {improved_triggers['domain'][d]}\n") + + # retain triggers now containing sequential counts + ts.triggers = improved_triggers + + db.session.commit() + print(output.getvalue()) From c419436a184e52b472e97a363e10ca4060c3e24c Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Thu, 16 Nov 2023 17:14:38 -0800 Subject: [PATCH 03/13] add debugging into to exception text --- portal/models/qb_timeline.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/portal/models/qb_timeline.py b/portal/models/qb_timeline.py index c263fb381..b4fb8de66 100644 --- a/portal/models/qb_timeline.py +++ b/portal/models/qb_timeline.py @@ -374,7 +374,8 @@ def qbds_for_rp(rp, classification, trigger_date): ) if curRPD.retired == nextRPD.retired: raise ValueError( - "Invalid state: multiple RPs w/ same retire date") + "Invalid state: multiple RPs w/ same retire date: " + f"{next_rp} : {curRPD.retired}") else: nextRPD = None yield curRPD, nextRPD From 2e67aef1c71f9657a571845fa76fadd165509776 Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Wed, 29 Nov 2023 15:26:57 -0800 Subject: [PATCH 04/13] add exception to catch details for bogus records --- portal/migrations/versions/80c3b1e96c45_.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/portal/migrations/versions/80c3b1e96c45_.py b/portal/migrations/versions/80c3b1e96c45_.py index 304fdc509..d6a6bf41d 100644 --- a/portal/migrations/versions/80c3b1e96c45_.py +++ b/portal/migrations/versions/80c3b1e96c45_.py @@ -102,6 +102,8 @@ def downgrade(): for ts in trigger_states: improved_triggers = deepcopy(ts.triggers) for d in EMPRO_DOMAINS: + if d not in improved_triggers["domain"]: + raise RuntimeError(f"{d} missing from {ts.visit_month} for {patient_id}") if sequential_hard_trigger_count_key in improved_triggers["domain"][d]: del improved_triggers["domain"][d][sequential_hard_trigger_count_key] output.write(f" removed sequential from {ts.visit_month}:{d} {improved_triggers['domain'][d]}\n") From 8b456b3be6d9ab9406e0ea9d81424aed7eae4898 Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Wed, 29 Nov 2023 15:27:59 -0800 Subject: [PATCH 05/13] add `sanity_check()` to /timewarp - confirm invariants before and after moving patient timeline. --- portal/views/patient.py | 43 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/portal/views/patient.py b/portal/views/patient.py index 7ef1e9c16..f4c90f2c5 100644 --- a/portal/views/patient.py +++ b/portal/views/patient.py @@ -4,6 +4,7 @@ for staff """ +from collections import defaultdict from datetime import datetime import json @@ -367,6 +368,10 @@ def patient_timeline(patient_id): except ValueError as ve: abort(500, str(ve)) + consents = [ + {"research_study_id": c.research_study_id, + "acceptance_date": c.acceptance_date, + "status": c.status} for c in user.valid_consents] results = [] # We order by at (to get the latest status for a given QB) and # secondly by id, as on rare occasions, the time (`at`) of @@ -543,7 +548,41 @@ def patient_timewarp(patient_id, days): from copy import deepcopy from portal.models.questionnaire_response import QuestionnaireResponse from portal.models.user_consent import UserConsent - + from ..trigger_states.models import TriggerState + + def sanity_check(): + """confirm user state before / after timewarp""" + # User should have one valid consent + patient = get_user(patient_id, 'view') + consents = patient.valid_consents + rps = [c for c in consents if c.research_study_id == 0] + assert len(rps) == 1 + + rp1s = [c for c in consents if c.research_study_id == 1] + if not len(rp1s): + return + assert len(rp1s) == 1 + + # Confirm valid trigger_states. No data prior to consent. + ts = TriggerState.query.filter( + TriggerState.user_id == patient_id, + TriggerState.timestamp < rp1s[0].acceptance_date).count() + assert ts == 0 + + # should never be more than a single row for any given state + ts = TriggerState.query.filter( + TriggerState.user_id == patient_id + ) + data = defaultdict(int) + for row in ts: + key = f"{row.visit_month}:{row.state}" + data[key] += 1 + for k, v in data.items(): + if v > 1: + raise RuntimeError( + f"Unique visit_month:state {k} broken in trigger_states for {patient}") + + sanity_check() if current_app.config['SYSTEM_TYPE'] == "production": abort(404) @@ -569,7 +608,6 @@ def patient_timewarp(patient_id, days): # trigger_state if current_app.config['GIL'] is None: - from ..trigger_states.models import TriggerState for ts in TriggerState.query.filter( TriggerState.user_id == user.id): changed.append(f"trigger_state {ts.id}") @@ -603,6 +641,7 @@ def patient_timewarp(patient_id, days): ar.timestamp = ar.timestamp - delta db.session.commit() + sanity_check() # Recalculate users timeline & qnr associations cache.delete_memoized(trigger_date) From 98fb482e38c0102189a5d8acd3b12638a8566d4f Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Wed, 24 Jan 2024 19:25:24 -0800 Subject: [PATCH 06/13] rework alembic migration order to reapply hard trigger migration again --- portal/migrations/versions/d1f3ed8d16ef_.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/portal/migrations/versions/d1f3ed8d16ef_.py b/portal/migrations/versions/d1f3ed8d16ef_.py index 133639bf0..531bc8604 100644 --- a/portal/migrations/versions/d1f3ed8d16ef_.py +++ b/portal/migrations/versions/d1f3ed8d16ef_.py @@ -11,7 +11,7 @@ # revision identifiers, used by Alembic. revision = 'd1f3ed8d16ef' -down_revision = '80c3b1e96c45' +down_revision = '2e9b9e696bb8' def upgrade(): From ae9215832fdad6bfbfc9211dc373298700d34953 Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Thu, 25 Jan 2024 15:31:30 -0800 Subject: [PATCH 07/13] refactor to reuse same migration again, with patched code in place --- portal/migrations/versions/80c3b1e96c45_.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/portal/migrations/versions/80c3b1e96c45_.py b/portal/migrations/versions/80c3b1e96c45_.py index d6a6bf41d..3e6ed5d0c 100644 --- a/portal/migrations/versions/80c3b1e96c45_.py +++ b/portal/migrations/versions/80c3b1e96c45_.py @@ -19,12 +19,19 @@ # revision identifiers, used by Alembic. revision = '80c3b1e96c45' -down_revision = '2e9b9e696bb8' +down_revision = '66368e673005' Session = sessionmaker() def upgrade(): + # Add sequential counts to appropriate trigger_states rows. + + # this migration was applied once before, but the code wasn't correctly + # maintaining the sequential counts. start by removing all for a clean + # slate via the same `downgrade()` step + downgrade() + # for each active EMPRO patient with at least 1 hard triggered domain, # walk through their monthly reports, adding the sequential count for # the opt-out feature. @@ -103,7 +110,7 @@ def downgrade(): improved_triggers = deepcopy(ts.triggers) for d in EMPRO_DOMAINS: if d not in improved_triggers["domain"]: - raise RuntimeError(f"{d} missing from {ts.visit_month} for {patient_id}") + raise RuntimeError(f"{d} missing from {ts.visit_month} for {pid}") if sequential_hard_trigger_count_key in improved_triggers["domain"][d]: del improved_triggers["domain"][d][sequential_hard_trigger_count_key] output.write(f" removed sequential from {ts.visit_month}:{d} {improved_triggers['domain'][d]}\n") From b57f478e878a73c17ef36545837b666dd2ac81d5 Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Wed, 13 Mar 2024 16:10:37 -0700 Subject: [PATCH 08/13] refactor email generation to common empro_messages module. --- portal/trigger_states/empro_messages.py | 26 +++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/portal/trigger_states/empro_messages.py b/portal/trigger_states/empro_messages.py index 0ad1c5015..dbd16521c 100644 --- a/portal/trigger_states/empro_messages.py +++ b/portal/trigger_states/empro_messages.py @@ -2,7 +2,9 @@ from datetime import datetime from flask import current_app, url_for from flask_babel import gettext as _ +from smtplib import SMTPRecipientsRefused +from portal.database import db from portal.models.app_text import MailResource, app_text from portal.models.communication import EmailMessage, load_template_args from portal.models.organization import UserOrganization @@ -11,6 +13,30 @@ from portal.models.qb_status import QB_Status +def invite_email(user): + if not user.email_ready(): + current_app.logger.error(f"{user.id} can't receive EMPRO invite email") + return + args = load_template_args(user=user) + item = MailResource( + app_text("patient invite email IRONMAN EMPRO Study"), + locale_code=user.locale_code, + variables=args) + msg = EmailMessage( + subject=item.subject, + body=item.body, + recipients=user.email, + sender=current_app.config['MAIL_DEFAULT_SENDER'], + user_id=user.id) + try: + msg.send_message() + except SMTPRecipientsRefused as exc: + current_app.logger.error( + "Error sending EMPRO Invite to %s: %s", + user.email, exc) + db.session.add(msg) + + def patient_email(patient, soft_triggers, hard_triggers): """Prepare email for patient, depending on trigger status""" From f9afc90789512d1313173de0ca3926772bbd7fef Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Wed, 13 Mar 2024 16:11:24 -0700 Subject: [PATCH 09/13] progress towards regenerating a user's trigger_states table on EMPRO consent change. --- portal/models/qb_status.py | 13 +++-- portal/trigger_states/empro_states.py | 58 +++++++++----------- portal/trigger_states/models.py | 77 +++++++++++++++++++++++++++ portal/views/user.py | 6 +++ 4 files changed, 117 insertions(+), 37 deletions(-) diff --git a/portal/models/qb_status.py b/portal/models/qb_status.py index c6cf1e774..cd6b755c8 100644 --- a/portal/models/qb_status.py +++ b/portal/models/qb_status.py @@ -520,7 +520,8 @@ def warn_on_duplicate_request(self, requested_set): f" {requested_indef} already!") -def patient_research_study_status(patient, ignore_QB_status=False): +def patient_research_study_status( + patient, ignore_QB_status=False, as_of_date=None, skip_initiate=False): """Returns details regarding patient readiness for available studies Wraps complexity of checking multiple QB_Status and ResearchStudy @@ -532,6 +533,8 @@ def patient_research_study_status(patient, ignore_QB_status=False): :param patient: subject to check :param ignore_QB_status: set to prevent recursive call, if used during process of evaluating QB_status. Will restrict results to eligible + :param as_of_date: set to check status at alternative time + :param skip_initiate: set only when rebuilding to avoid state change :returns: dictionary of applicable studies keyed by research_study_id. Each contains a dictionary with keys: - eligible: set True if assigned to research study and pre-requisites @@ -546,7 +549,8 @@ def patient_research_study_status(patient, ignore_QB_status=False): """ from datetime import datetime from .research_study import EMPRO_RS_ID, ResearchStudy - as_of_date = datetime.utcnow() + if as_of_date is None: + as_of_date = datetime.utcnow() results = {} # check studies in required order - first found with pending work @@ -601,7 +605,8 @@ def patient_research_study_status(patient, ignore_QB_status=False): elif rs_status['ready']: # As user may have just entered ready status on EMPRO # move trigger_states.state to due - from ..trigger_states.empro_states import initiate_trigger - initiate_trigger(patient.id) + if not skip_initiate: + from ..trigger_states.empro_states import initiate_trigger + initiate_trigger(patient.id) return results diff --git a/portal/trigger_states/empro_states.py b/portal/trigger_states/empro_states.py index 4ac05204f..52fd0066e 100644 --- a/portal/trigger_states/empro_states.py +++ b/portal/trigger_states/empro_states.py @@ -12,13 +12,10 @@ from statemachine.exceptions import TransitionNotAllowed from .empro_domains import DomainManifold -from .empro_messages import patient_email, staff_emails +from .empro_messages import invite_email, patient_email, staff_emails from .models import TriggerState from ..database import db from ..date_tools import FHIR_datetime -from ..models.app_text import MailResource, app_text -from ..models.communication import load_template_args -from ..models.message import EmailMessage from ..models.qb_status import QB_Status from ..models.qbd import QBD from ..models.questionnaire_bank import QuestionnaireBank @@ -81,7 +78,7 @@ class EMPRO_state(StateMachine): next_available = resolved.to(due) -def users_trigger_state(user_id): +def users_trigger_state(user_id, as_of_date=None): """Obtain the latest trigger state for given user Returns latest TriggerState row for user or creates transient if not @@ -95,11 +92,21 @@ def users_trigger_state(user_id): - triggered: triggers available in TriggerState.triggers attribute """ - ts = TriggerState.query.filter( + if as_of_date is None: + as_of_date = datetime.utcnow() + + rows = TriggerState.query.filter( TriggerState.user_id == user_id).order_by( - TriggerState.timestamp.desc()).first() + TriggerState.timestamp.desc()) + for ts_row in rows: + # most recent with a timestamp prior to as_of_date, in case this is a rebuild + if as_of_date < ts_row.timestamp: + continue + ts = ts_row + break + if not ts: - ts = TriggerState(user_id=user_id, state='unstarted') + ts = TriggerState(user_id=user_id, state='unstarted', timestamp=as_of_date) return ts @@ -115,19 +122,22 @@ def lookup_visit_month(user_id, as_of_date): return one_index - 1 -def initiate_trigger(user_id): +def initiate_trigger(user_id, as_of_date=None, rebuilding=False): """Call when EMPRO becomes available for user or next is due""" + if as_of_date is None: + as_of_date = datetime.utcnow() + ts = users_trigger_state(user_id) if ts.state == 'due': # Possible the user took no action, as in skipped the last month # (or multiple months may have been skipped if time-warping). # If so, the visit_month and timestamp are updated on the last # `due` row that was found above. - visit_month = lookup_visit_month(user_id, datetime.utcnow()) + visit_month = lookup_visit_month(user_id, as_of_date) if ts.visit_month != visit_month: current_app.logger.warn(f"{user_id} skipped EMPRO visit {ts.visit_month}") ts.visit_month = visit_month - ts.timestamp = datetime.utcnow() + ts.timestamp = as_of_date db.session.commit() # Allow idempotent call - skip out if in correct state @@ -143,7 +153,7 @@ def initiate_trigger(user_id): next_visit = int(ts.visit_month) + 1 current_app.logger.debug(f"transition from {ts} to next due") # generate a new ts, to leave resolved record behind - ts = TriggerState(user_id=user_id, state='unstarted') + ts = TriggerState(user_id=user_id, state='unstarted', as_of_date=as_of_date) ts.visit_month = next_visit current_app.logger.debug( "persist-trigger_states-new from initiate_trigger(), " @@ -159,27 +169,9 @@ def initiate_trigger(user_id): "persist-trigger_states-new from initiate_trigger()," f"record historical clause {ts}") - # TN-2863 auto send invite when first available - if ts.visit_month == 0: - user = User.query.get(user_id) - args = load_template_args(user=user) - item = MailResource( - app_text("patient invite email IRONMAN EMPRO Study"), - locale_code=user.locale_code, - variables=args) - msg = EmailMessage( - subject=item.subject, - body=item.body, - recipients=user.email, - sender=current_app.config['MAIL_DEFAULT_SENDER'], - user_id=user_id) - try: - msg.send_message() - except SMTPRecipientsRefused as exc: - current_app.logger.error( - "Error sending EMPRO Invite to %s: %s", - user.email, exc) - db.session.add(msg) + # TN-2863 auto send invite when first available, unless rebuilding + if ts.visit_month == 0 and not rebuilding: + invite_email(User.query.get(user_id)) db.session.commit() return ts diff --git a/portal/trigger_states/models.py b/portal/trigger_states/models.py index c91402fcb..5934a9c79 100644 --- a/portal/trigger_states/models.py +++ b/portal/trigger_states/models.py @@ -1,4 +1,5 @@ from datetime import datetime, timedelta +from flask import current_app from sqlalchemy.dialects.postgresql import ENUM, JSONB from sqlalchemy.orm import make_transient @@ -254,3 +255,79 @@ def soft_triggers_for_visit(self, visit_month): ts = self.latest_by_visit[visit_month] if ts: return ts.soft_trigger_list() + + +def rebuild_trigger_states(patient): + """If a user's consent moves, need to re-build the trigger states for user + + Especially messy process, as much of the data lives in the trigger_states + table alone, and a consent change may modify start eligibility, etc. + """ + from .empro_states import initiate_trigger + from ..models.overall_status import OverallStatus + from ..models.qb_status import patient_research_study_status + from ..models.qb_timeline import QBT, update_users_QBT + from ..models.research_study import BASE_RS_ID, EMPRO_RS_ID + + # Use the timeline data for accurate start dates, etc. + update_users_QBT(user_id=patient.id, research_study_id=EMPRO_RS_ID) + tl_query = QBT.query.filter(QBT.user_id == patient.id).filter( + QBT.research_study_id == EMPRO_RS_ID).order_by(QBT.id) + if not tl_query.count(): + # User has no timeline data for EMPRO, likely not eligible + if TriggerState.query.filter(TriggerState.user_id == patient.id).count(): + current_app.logging.error(f"no EMPRO timeline, yet trigger_states rows for {patient.id}") + return + + # Capture state in memory for potential reuse when rebuilding + data = [] + for row in TriggerState.query.filter( + TriggerState.user_id == patient.id).order_by(TriggerState.id): + data.append({ + 'id': row.id, + 'state': row.state, + 'timestamp': row.timestamp, + 'questionnaire_response_id': row.questionnaire_response_id, + 'triggers': row.triggers, + 'visit_month': row.visit_month, + }) + + if not data: + # no trigger state data to move, no problem. + return + + # purge rows and rebuild below + # TODO TriggerState.delete and rebuild + raise NotImplemented(f"can not adjust trigger_states for {patient.id}") + + if len([c for c in patient.clinicians]) == 0: + # No valid trigger states without a clinician + return + + visit_month = -1 + for row in tl_query: + if row.status == OverallStatus.due: + # reset any state for next visit: + visit_month += 1 + conclude_as_expired = False + + # 'due' row starts when they became eligible + as_of_date = row.at + study_status = patient_research_study_status( + patient=patient, as_of_date=as_of_date, skip_initiate=True) + + if study_status[BASE_RS_ID]['ready']: + # user had unfinished global work at said point in time. + # check if they complete before the current visit expires + basestudy_query = QBT.query.filter(QBT.user_id == patient.id).filter( + QBT.research_study_id == BASE_RS_ID).filter( + QBT.at.between(as_of_date, as_of_date+timedelta(days=30))).filter( + QBT.status == OverallStatus.completed).first() + if basestudy_query: + # user finished global work on time, start the visit then + as_of_date = basestudy_query.as_of_date + else: + # user was never able to submit visit for given empro month. + # stick with initial start date and let it resolve as expired + conclude_as_expired = True + initiate_trigger(patient.id, as_of_date=as_of_date, rebuilding=True) diff --git a/portal/views/user.py b/portal/views/user.py index 5acce26ea..87c33460f 100644 --- a/portal/views/user.py +++ b/portal/views/user.py @@ -749,6 +749,12 @@ def set_user_consents(user_id): # status - invalidate this user's data at this time. invalidate_users_QBT( user_id=user.id, research_study_id=consent.research_study_id) + + # If user has submitted EMPRO - those must also be recalculated + if consent.research_study_id == EMPRO_RS_ID: + from ..trigger_states.models import rebuild_trigger_states + rebuild_trigger_states(user) + except ValueError as e: abort(400, str(e)) From 0facf6e644fc4121ff17410938ac24936fb5d047 Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Wed, 13 Mar 2024 16:12:58 -0700 Subject: [PATCH 10/13] correct order of migrations with work done outside branch. log rather than blow up, when a finding missing SDC observations. --- portal/migrations/versions/80c3b1e96c45_.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/portal/migrations/versions/80c3b1e96c45_.py b/portal/migrations/versions/80c3b1e96c45_.py index 3e6ed5d0c..b20cfec5b 100644 --- a/portal/migrations/versions/80c3b1e96c45_.py +++ b/portal/migrations/versions/80c3b1e96c45_.py @@ -1,7 +1,7 @@ """Add sequential hard trigger count to EMPRO trigger_states.triggers domains. Revision ID: 80c3b1e96c45 -Revises: 2e9b9e696bb8 +Revises: 3c871e710277 Create Date: 2023-07-24 17:08:35.128975 """ @@ -9,6 +9,7 @@ from copy import deepcopy from alembic import op from io import StringIO +import logging from sqlalchemy.orm import sessionmaker from portal.database import db from portal.trigger_states.empro_domains import ( @@ -19,10 +20,13 @@ # revision identifiers, used by Alembic. revision = '80c3b1e96c45' -down_revision = '66368e673005' +down_revision = '3c871e710277' Session = sessionmaker() +log = logging.getLogger("alembic.runtime.migration") +log.setLevel(logging.DEBUG) + def upgrade(): # Add sequential counts to appropriate trigger_states rows. @@ -61,9 +65,10 @@ def upgrade(): for d in EMPRO_DOMAINS: sequential_hard_for_this_domain = 0 if d not in improved_triggers["domain"]: - # only seen on test, fill in the missing domain - print(f"missing {d} in {pid}:{ts.visit_month}?") - improved_triggers["domain"][d] = {} + # shouldn't happen, SDC typically includes all domains + # but a few records are lacking + log.warning(f"{pid} missing domain {d} in {ts.visit_month} response") + continue if any(v == "hard" for v in improved_triggers["domain"][d].values()): sequential_by_domain[d].append(ts.visit_month) @@ -110,7 +115,8 @@ def downgrade(): improved_triggers = deepcopy(ts.triggers) for d in EMPRO_DOMAINS: if d not in improved_triggers["domain"]: - raise RuntimeError(f"{d} missing from {ts.visit_month} for {pid}") + log.warning(f"{d} missing from {ts.id}(month: {ts.visit_month}) for {pid}") + continue if sequential_hard_trigger_count_key in improved_triggers["domain"][d]: del improved_triggers["domain"][d][sequential_hard_trigger_count_key] output.write(f" removed sequential from {ts.visit_month}:{d} {improved_triggers['domain'][d]}\n") From 85e635a6b3fd2aca3e02a861da6d09e37036990e Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Wed, 13 Mar 2024 16:53:30 -0700 Subject: [PATCH 11/13] allow for minor EMPRO consent changes and withdrawal calls w/o raising errors. improve logging when trigger_states become out of sync with qb_timeline due to consent change. --- portal/trigger_states/models.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/portal/trigger_states/models.py b/portal/trigger_states/models.py index 5934a9c79..827cef072 100644 --- a/portal/trigger_states/models.py +++ b/portal/trigger_states/models.py @@ -268,6 +268,7 @@ def rebuild_trigger_states(patient): from ..models.qb_status import patient_research_study_status from ..models.qb_timeline import QBT, update_users_QBT from ..models.research_study import BASE_RS_ID, EMPRO_RS_ID + from ..models.user_consent import consent_withdrawal_dates # Use the timeline data for accurate start dates, etc. update_users_QBT(user_id=patient.id, research_study_id=EMPRO_RS_ID) @@ -296,8 +297,19 @@ def rebuild_trigger_states(patient): # no trigger state data to move, no problem. return + consent_date, wd_date = consent_withdrawal_dates(patient, EMPRO_RS_ID) + month0_dues = [d for d in data if d.visit_month == 0 and d.state == 'due'] + if len(month0_dues) != 1: + raise ValueError(f"{patient.id} failed to find trigger_states due row for month 0") + if (consent_date < month0_dues[0].timestamp) and ( + month0_dues[0].timestamp < consent_date + timedelta(days=30)): + # if the user's month 0 due is within 30 days of consent, don't shift + return + # purge rows and rebuild below # TODO TriggerState.delete and rebuild + current_app.logging.error( + f"{patient.id} trigger_states out of sync with qb_timeline; requires attention!") raise NotImplemented(f"can not adjust trigger_states for {patient.id}") if len([c for c in patient.clinicians]) == 0: From 20490b71b902b5590d4857be2521cc4b0fe9f941 Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Thu, 14 Mar 2024 16:05:21 -0700 Subject: [PATCH 12/13] minor refactor typo; initialize variable only set on a match before testing --- portal/trigger_states/empro_states.py | 1 + 1 file changed, 1 insertion(+) diff --git a/portal/trigger_states/empro_states.py b/portal/trigger_states/empro_states.py index 52fd0066e..19089b956 100644 --- a/portal/trigger_states/empro_states.py +++ b/portal/trigger_states/empro_states.py @@ -95,6 +95,7 @@ def users_trigger_state(user_id, as_of_date=None): if as_of_date is None: as_of_date = datetime.utcnow() + ts = None rows = TriggerState.query.filter( TriggerState.user_id == user_id).order_by( TriggerState.timestamp.desc()) From 1aaf5a23c2780426412d3d0c4b39547dd6a6b81a Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Fri, 15 Mar 2024 07:21:15 -0700 Subject: [PATCH 13/13] alert on production, purge on dev/test, problem trigger_states rows from consent changes. --- portal/migrations/versions/80c3b1e96c45_.py | 36 +++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/portal/migrations/versions/80c3b1e96c45_.py b/portal/migrations/versions/80c3b1e96c45_.py index b20cfec5b..282ac6f07 100644 --- a/portal/migrations/versions/80c3b1e96c45_.py +++ b/portal/migrations/versions/80c3b1e96c45_.py @@ -9,6 +9,7 @@ from copy import deepcopy from alembic import op from io import StringIO +from flask import current_app import logging from sqlalchemy.orm import sessionmaker from portal.database import db @@ -28,6 +29,32 @@ log.setLevel(logging.DEBUG) +def validate_users_trigger_states(session, patient_id): + """Confirm user has sequential visits in trigger states table. + + Due to allowance of moving EMPRO consents and no previous checks, + some users on test have invalid overlapping trigger states rows. + """ + ts_rows = session.query(TriggerState).filter( + TriggerState.user_id == patient_id).order_by(TriggerState.id) + month_counter = -1 + for row in ts_rows: + if row.state == 'due': + # skipping months is okay, but every due should be sequentially greater than previous + if month_counter >= row.visit_month: + raise ValueError(f"{patient_id} expected month > {month_counter}, got {row.visit_month}") + month_counter = row.visit_month + else: + # states other than 'due' should be grouped together with same visit_month + if month_counter != row.visit_month: + raise ValueError(f"{patient_id} expected month {month_counter}, got {row.visit_month}") + +def purge_trigger_states(session, patient_id): + """Clean up test system problems from moving consent dates""" + log.info(f"Purging trigger states for {patient_id}") + session.query(TriggerState).filter(TriggerState.user_id == patient_id).delete() + + def upgrade(): # Add sequential counts to appropriate trigger_states rows. @@ -53,6 +80,15 @@ def upgrade(): # can't just send through current process, as it'll attempt to # insert undesired rows in the trigger_states table. need to # add the sequential count to existing rows. + try: + validate_users_trigger_states(session, pid) + except ValueError as e: + if current_app.config.get('SYSTEM_TYPE') in ('development', 'test'): + purge_trigger_states(session, pid) + continue + else: + raise e + output.write(f"\n\nPatient: {pid} storing all zeros for sequential hard triggers except:\n") output.write(" (visit month : domain : # hard sequential)\n") sequential_by_domain = defaultdict(list)