Skip to content

Commit

Permalink
Update of OMOP conversion due to remarks in Issue #11
Browse files Browse the repository at this point in the history
  • Loading branch information
USM-CHU-FGuyon committed Dec 7, 2023
1 parent 2a266e2 commit 292f119
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 36 deletions.
8 changes: 4 additions & 4 deletions 4_write_omop.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
"""
from blended_preprocessing.omop_conversion import OMOP_converter

c = OMOP_converter(initialize_tables=True)
self = OMOP_converter(initialize_tables=True)

c.measurement_table()
self.measurement_table(start_chunk=0)

c.drug_exposure_table()
self.drug_exposure_table(start_chunk=0)

c.export_tables()
self.export_tables()
171 changes: 141 additions & 30 deletions blended_preprocessing/omop_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pandas as pd
import numpy as np
import pyarrow as pa

from blended_preprocessing.timeseries import blendedicuTSP
from omop_cdm import cdm
Expand All @@ -20,16 +21,10 @@ def __init__(self,
self.end_date = datetime(year=2099, month=12, day=31)
self.adm_measuredat = self.flat_hr_from_adm.total_seconds()
self.admission_data_datetime = (self.ref_date + pd.Timedelta(self.adm_measuredat, unit='second'))
self.n_chunks = 100
if full_init:
self.labels = self._load_labels()
self.n_chunks = 100
ts_pth = self.data_pth+'formatted_timeseries/'
med_pth = self.data_pth+'formatted_medications/'
self.ts_pths = self.rglob(ts_pth, '*.parquet')
self.med_pths = self.rglob(med_pth, '*.parquet')

self.ts_pths_chunks = self._get_chunks(self.ts_pths)
self.med_pths_chunks = self._get_chunks(self.med_pths)
self.ts_pths_chunks, self.med_pths_chunks = self._get_pth_chunks()

pth_concept_table = fr'{self.aux_pth}OMOP_vocabulary/CONCEPT.parquet'
self.omop_concept = pd.read_parquet(pth_concept_table,
Expand Down Expand Up @@ -144,9 +139,91 @@ def __init__(self,

self.units = self._get_units()

self.measurement_schema = self._measurement_schema()
self.observation_schema = self._observation_schema()
self.drug_exposure_schema = self._drug_exposure_schema()

def _get_pth_chunks(self, shuffleseed=974):
'''
rglob lists all files, then sorts them and shuffle them with a seed
to make a reproducible unsorted order.
then paths are split into a list of chunks.
'''
ts_pths = self.rglob(self.data_pth+'formatted_timeseries/',
'*.parquet',
verbose=True,
sort=True,
shuffleseed=shuffleseed)
med_pths = self.rglob(self.data_pth+'formatted_medications/',
'*.parquet',
verbose=True,
sort=True,
shuffleseed=shuffleseed)

ts_pths_chunks = self._get_chunks(ts_pths)
med_pths_chunks = self._get_chunks(med_pths)
return ts_pths_chunks, med_pths_chunks

def _measurement_schema(self):
schema = pa.schema([('value_as_number', pa.float32()),
('time', pa.float32()),
('visit_occurrence_id', pa.int64()),
('visit_start_date', pa.date32()),
('visit_source_value', pa.string()),
('person_id', pa.int32()),
('measurement_datetime', pa.date64()),
('measurement_date', pa.date32()),
('measurement_time', pa.time32('s')),
('measurement_concept_id', pa.int32()),
('measurement_source_value', pa.float32()),
('unit_source', pa.string()),
('unit_concept_id', pa.int32()),
('measurement_id', pa.float32()),
])
return schema

def _observation_schema(self):
schema = pa.schema([('observation_id', pa.int32()),
('person_id', pa.int32()),
('observation_concept_id', pa.int32()),
('observation_date', pa.date32()),
('observation_datetime', pa.time32('s')),
('observation_type_concept_id', pa.int32()),
('value_as_number', pa.float64()),
('value_as_string', pa.string()),
('value_as_concept_id', pa.int32()),
('qualifier_concept_id', pa.float32()),
('unit_concept_id', pa.int32()),
('provider_id', pa.float32()),
('visit_occurrence_id', pa.int64()),
('visit_detail_id', pa.float32()),
('observation_source_value', pa.float32()),
('observation_source_concept_id', pa.int32()),
('unit_source_value', pa.string()),
('qualifier_source_value', pa.string()),
('value_source_value', pa.float64()),
('observation_event_id', pa.float64()),
('obs_event_field_concept_id', pa.float64()),
])
return schema

def _drug_exposure_schema(self):
schema = pa.schema([('drug_source_value', pa.string()),
('drug_type_concept_id', pa.int32()),
('visit_occurrence_id', pa.int64()),
('person_id', pa.int32()),
('drug_exposure_start_date', pa.date32()),
('drug_exposure_start_datetime', pa.string()),
('drug_exposure_end_date', pa.date32()),
('drug_exposure_end_datetime', pa.string()),
('drug_concept_id', pa.int32()),
])
return schema



def _get_chunks(self, pths):
return map(list, np.array_split(pths, len(pths)/self.n_chunks))
return map(list, np.array_split(pths, self.n_chunks))

def source_to_concept_map_table(self):
ts_mapping = self.cols.concept_id.dropna().astype(int)
Expand Down Expand Up @@ -259,16 +336,20 @@ def death_table(self):
self.death['death_date'] = death_datetimes.dt.date
self.death['death_datetime'] = death_datetimes.dt.time

def _add_measurement(self, varname, timeseries=None):
def _add_measurement(self, varname, timeseries=None, patients=[]):
self.ts = timeseries

print(f'collecting {varname}')
if timeseries is None:
vals = self.labels.loc[:, ['patient', varname]]
keep_idx = self.labels.patient.isin(patients)
vals = self.labels.loc[keep_idx, ['patient', varname]]
vals['time'] = self.adm_measuredat
else:
# TODO : HANDLE KEYERROR !
vals = timeseries.loc[:, ['time', 'patient', varname]]
try:
vals = timeseries.loc[:, ['time', 'patient', varname]].dropna()
except KeyError:
print(f'Key {varname} not found')
return self.measurement

vals = vals.rename(columns={varname: 'value_as_number'})
self.vals = vals
Expand All @@ -285,8 +366,9 @@ def _add_measurement(self, varname, timeseries=None):
left_on='patient',
right_on='visit_source_value')

vals['measurement_datetime'] = self.ref_date + \
vals['time'].apply(pd.Timedelta, unit='second')
vals['measurement_datetime'] = (self.ref_date + \
vals['time'].apply(pd.Timedelta, unit='second')).astype('datetime64[ns]')

vals['measurement_date'] = vals['measurement_datetime'].dt.date
vals['measurement_time'] = vals['measurement_datetime'].dt.time

Expand All @@ -298,7 +380,8 @@ def _add_measurement(self, varname, timeseries=None):
vals = vals.drop(columns=['patient'])
return self.concat([self.measurement, vals])

def measurement_table(self):

def measurement_table(self, start_chunk=0):
start_index = self.start_index['measurement']
self.admission_measurements = ['height', 'weight']
self.ts_measurements = [
Expand All @@ -322,24 +405,33 @@ def measurement_table(self):
'glasgow_coma_score_motor', 'glasgow_coma_score_verbal']

for i, pth_chunk in enumerate(self.ts_pths_chunks):
print(f'Chunk {i}/{self.n_chunks}')
if i< start_chunk:
continue
print(f'Measurement chunk {i}/{self.n_chunks}')

self.measurement = cdm.tables['MEASUREMENT'].copy()
chunk = pd.read_parquet(pth_chunk).reset_index()
self.chunk = chunk

for varname in self.admission_measurements:
self.measurement = self._add_measurement(varname)
self.measurement = self._add_measurement(varname,
patients=chunk.patient.unique())

for varname in self.ts_measurements:
self.measurement = self._add_measurement(varname,
timeseries=chunk)

self.measurement['measurement_id'] = self.measurement.index + start_index
self.measurement['measurement_id'] = (self.measurement.index + start_index).astype('float32')
start_index = start_index+self.start_index['measurement']

self.measurement['visit_start_date'] = pd.to_datetime(self.measurement['visit_start_date'])
self.measurement['measurement_date'] = pd.to_datetime(self.measurement['measurement_date'])

self.export_table(self.measurement,
'measurement',
'MEASUREMENT',
mode='parquet',
chunkindex=i)
chunkindex=i,
schema=self.measurement_schema)


def _add_observation(self, breaks, column, concept_id, unit_concept_id):
Expand Down Expand Up @@ -381,6 +473,9 @@ def observation_table(self):
self.observation['observation_id'] = np.arange(start_index,
start_index+len(self.observation))

self.observation['observation_date'] = pd.to_datetime(self.observation['observation_date'])


def _add_drugs(self, chunk):
df = pd.DataFrame()
df['_patient'] = chunk.patient
Expand Down Expand Up @@ -408,19 +503,28 @@ def _add_drugs(self, chunk):
return self.concat([self.drug_exposure, df]).set_index('drug_exposure_id')


def drug_exposure_table(self):
self.drug_exposure = cdm.tables['DRUG_EXPOSURE'].copy()
start_index = self.start_index['drug_exposure']
def drug_exposure_table(self, start_chunk=0):
for i, pth_chunk in enumerate(self.med_pths_chunks):
print(i)
if i < start_chunk:
continue
self.drug_exposure = cdm.tables['DRUG_EXPOSURE'].copy()
print(f'Drug exposure chunk {i}/{self.n_chunks}')
chunk = pd.read_parquet(pth_chunk).reset_index()
self.chunk = chunk
self.drug_exposure = self._add_drugs(chunk)
self.drug_exposure = (chunk.pipe(self._add_drugs)
.drop(columns='_patient'))

self.drug_exposure['drug_exposure_start_date'] = pd.to_datetime(self.drug_exposure['drug_exposure_start_date'])
self.drug_exposure['drug_exposure_end_date'] = pd.to_datetime(self.drug_exposure['drug_exposure_end_date'])
self.drug_exposure['drug_exposure_start_datetime'] = self.drug_exposure['drug_exposure_start_datetime'].astype(str)
self.drug_exposure['drug_exposure_end_datetime'] = self.drug_exposure['drug_exposure_end_datetime'].astype(str)

self.export_table(self.drug_exposure,
'DRUG_EXPOSURE',
mode='parquet',
chunkindex=i)
chunkindex=i,
schema=self.drug_exposure_schema,
)

def care_site_table(self):
print('Care_site table...')
Expand Down Expand Up @@ -603,7 +707,12 @@ def _load_labels(self):
df = pd.read_parquet(labels_pth).reset_index()
return df

def export_table(self, table, name, mode='w', chunkindex=None):
def export_table(self,
table,
name,
mode='w',
chunkindex=None,
schema=None):
"""
Exports a table
* to csv file if mode is "w" or "a" with the corresponding mode.
Expand All @@ -619,7 +728,7 @@ def export_table(self, table, name, mode='w', chunkindex=None):
Path(savedir).mkdir(exist_ok=True, parents=True)
savepath = f'{savedir}{name}_{chunkindex}.parquet'
print(f'Saving {savepath}')
table.to_parquet(savepath)
table.to_parquet(savepath, schema=schema)

def export_tables(self):
"""
Expand All @@ -634,7 +743,9 @@ def export_tables(self):
self.export_table(self.concept.reset_index(), 'CONCEPT')
self.export_table(self.death, 'DEATH')
self.export_table(self.care_site, 'CARE_SITE')
self.export_table(self.observation, 'OBSERVATION')
self.export_table(self.observation,
'OBSERVATION',
schema=self.observation_schema)
self.export_table(self.person, 'PERSON')
self.export_table(self.source_to_concept_map, 'SOURCE_TO_CONCEPT_MAP')
self.export_table(self.visit_occurrence, 'VISIT_OCCURRENCE')
Expand Down
16 changes: 14 additions & 2 deletions database_processing/dataprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import json
import shutil
import chardet
import random

import natsort
import pandas as pd


Expand Down Expand Up @@ -134,11 +136,21 @@ def rmdir(self, pth):
except FileNotFoundError:
pass

def rglob(self, pth, reg):
def rglob(self, pth, reg, verbose=False, sort=False, shuffleseed=None):
"""
alias for using rglob.
"""
return [*Path(pth).rglob(reg)]
if verbose: print(f' Iterate {pth}{reg}...')
_rglob = Path(pth).rglob(reg)
if verbose: print(' Cast to list...')
_rglob_list = list(_rglob)
if verbose: print(' done.')
if sort:
_rglob_list = natsort.natsorted(_rglob_list)
if shuffleseed is not None:
random.seed(shuffleseed)
random.shuffle(_rglob_list)
return _rglob_list

def reset_dir(self):
pth_preprocessed_ts = Path(self.preprocessed_ts_dir)
Expand Down

0 comments on commit 292f119

Please sign in to comment.