Skip to content

Commit

Permalink
updated ORF block processing
Browse files Browse the repository at this point in the history
  • Loading branch information
franskloet committed May 15, 2024
1 parent 453a00e commit 51a7299
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 42 deletions.
1 change: 1 addition & 0 deletions .VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
v0.91 with updated ORF processing, createPlotData_HDF is only function that still needs block approach
154 changes: 112 additions & 42 deletions run_a_site_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def createPlotData_HDF(pDict:dict, postfix="", cutoff_dist=20):
df_data = hdf_store.get("/raw_aa_plot_data")
except:
# load all data in array .. not actually a block function.. for later
print("processing plot data per block")
print("processing combined plot data")
df_data = pd.DataFrame()
_keys = [k for k in hdf_store.keys() if k.startswith("/augmented/block_")]
for k in _keys:
Expand Down Expand Up @@ -239,6 +239,17 @@ def createPlotData_HDF(pDict:dict, postfix="", cutoff_dist=20):
hdf_store.put("plotdata/filter_{0}".format(filter),df_wrk_plt_export[['position','amino acid','serie','counts']])


summary_dict = multiprocessing.Manager().dict()
def summary_block_thread(index, store_ptr,sema,verbose):
if verbose:
print("processing block {0} for summary".format(index))
df = store_ptr.get(index)
genes_summary__ = df.groupby('gene').agg(['count','min'])['gene_len']
sema.release()
with lock:
summary_dict[index]=genes_summary__


def createAsiteDistributionPlotData_HDF(pDict:dict):

with pd.HDFStore(pDict[PROJECT_INFO.STORE]) as hdf_store:
Expand All @@ -247,22 +258,33 @@ def createAsiteDistributionPlotData_HDF(pDict:dict):
print("load distribution summary data")
genes_summary_ = hdf_store.get("/genes_summary_")

else: # create filters
else: # create filters and summary
print("process distrubtion data per block")
genes_summary_= pd.DataFrame()

multiprocessing_loop_ = []
sema = multiprocessing.Semaphore(pDict[PROJECT_INFO.CPU]);
_keys = [k for k in hdf_store.keys() if k.startswith("/augmented/block_")]
for k in _keys:
df = hdf_store.get(k)
genes_summary__ = df.groupby('gene').agg(['count','min'])['gene_len']

if(genes_summary_.shape[0]==0):
genes_summary_ = genes_summary__.copy()
else:
_df = genes_summary_.merge(genes_summary__,left_on=['gene'],right_on=['gene'],how='outer')
_df['count'] = _df[['count_x','count_y']].sum(axis=1).astype('int')
_df['min'] = _df[['min_x','min_y']].min(axis=1)
_df = _df.drop(['count_x','count_y','min_x','min_y'],axis=1).copy()
genes_summary_ = _df.copy()
for k in _keys:
# countdown available semaphores
sema.acquire();
_process = multiprocessing.Process(target=summary_block_thread, args=(k,hdf_store,sema, pDict[PROJECT_INFO.ARGS].verbose))
multiprocessing_loop_.append(_process)
_process.start()

# wait for all processes to finish
for process_ in multiprocessing_loop_ :
process_.join()

print("joining summary blocks")
genes_summary_= summary_dict[_keys[0]]

for k in _keys[1:]:
_df = summary_dict[k]
_df = genes_summary_.merge(_df,left_on=['gene'],right_on=['gene'],how='outer')
_df['count'] = _df[['count_x','count_y']].sum(axis=1).astype('int')
_df['min'] = _df[['min_x','min_y']].min(axis=1)
_df = _df.drop(['count_x','count_y','min_x','min_y'],axis=1).copy()
genes_summary_ = _df.copy()

hdf_store.put("/genes_summary_",genes_summary_)

Expand Down Expand Up @@ -321,8 +343,6 @@ class ORF_FILTER(IntEnum):
L10 = 4




def make_ORF_plot_data(dataIn:pd.DataFrame,dir53:bool=True):

if dir53:
Expand Down Expand Up @@ -354,6 +374,23 @@ def make_ORF_plot_data(dataIn:pd.DataFrame,dir53:bool=True):
return df_plot,total_plot


orf_dict = multiprocessing.Manager().dict()

def orf_block_thread(index, store_ptr,sema,verbose):

if verbose:
print("processing block {0} for ORF".format(index))
df = store_ptr.get(index)
dfm_35 = df.groupby(['posAsite35','dir','gene']).agg(['count','min'])['gene_len'].reset_index()
dfm_53 = df.groupby(['posAsite53','dir','gene']).agg(['count','min'])['gene_len'].reset_index()

sema.release()

with lock:
orf_dict[index]=[dfm_35,dfm_53]



def prepare_ORF_plot_data_HDF(pDict:dict, filter:ORF_FILTER=ORF_FILTER.NONE):
#
# output_file_name = join(_op,file_prefix.replace("_ASITE.pkl","_ORF_unfiltered_data_HDF.pkl"))
Expand All @@ -373,23 +410,54 @@ def prepare_ORF_plot_data_HDF(pDict:dict, filter:ORF_FILTER=ORF_FILTER.NONE):
_nblocks = pDict[PROJECT_INFO.NBLOCKS]

assert len(_keys)==_nblocks, "file corruption error, not enough data blocks found"

for k in _keys:
df = hdf_store.get(k)
_dfm_35 = df.groupby(['posAsite35','dir','gene']).agg(['count','min'])['gene_len'].reset_index()
_dfm_53 = df.groupby(['posAsite53','dir','gene']).agg(['count','min'])['gene_len'].reset_index()
if(dfm_35.shape[0]==0):
dfm_35 = _dfm_35.copy()
dfm_53 = _dfm_53.copy()
else:
_df = dfm_35.merge(_dfm_35,left_on=['posAsite35','dir','gene'],right_on=['posAsite35','dir','gene'],how='outer')
_df['count'] = _df[['count_x','count_y']].sum(axis=1).astype('int')
_df['min'] = _df[['min_x','min_y']].min(axis=1)
dfm_35 = _df.drop(['count_x','count_y','min_x','min_y'],axis=1).copy()
_df = dfm_53.merge(_dfm_53,left_on=['posAsite53','dir','gene'],right_on=['posAsite53','dir','gene'],how='outer')
_df['count'] = _df[['count_x','count_y']].sum(axis=1).astype('int')
_df['min'] = _df[['min_x','min_y']].min(axis=1)
dfm_53 = _df.drop(['count_x','count_y','min_x','min_y'],axis=1).copy()
multiprocessing_loop_ = []
sema = multiprocessing.Semaphore(pDict[PROJECT_INFO.CPU]);
for k in _keys:
# countdown available semaphores
sema.acquire();
_process = multiprocessing.Process(target=orf_block_thread, args=(k,hdf_store,sema, True))
multiprocessing_loop_.append(_process)
_process.start()

# wait for all processes to finish
for process_ in multiprocessing_loop_ :
process_.join()

print("combining ORF blocks")
dfm_35 = orf_dict[_keys[0]][0]
dfm_53 = orf_dict[_keys[0]][1]
for k in _keys[1:]:
_dfm_35 = orf_dict[k][0]
_dfm_53 = orf_dict[k][1]
_df = dfm_35.merge(_dfm_35,left_on=['posAsite35','dir','gene'],right_on=['posAsite35','dir','gene'],how='outer')
_df['count'] = _df[['count_x','count_y']].sum(axis=1).astype('int')
_df['min'] = _df[['min_x','min_y']].min(axis=1)
dfm_35 = _df.drop(['count_x','count_y','min_x','min_y'],axis=1).copy()
_df = dfm_53.merge(_dfm_53,left_on=['posAsite53','dir','gene'],right_on=['posAsite53','dir','gene'],how='outer')
_df['count'] = _df[['count_x','count_y']].sum(axis=1).astype('int')
_df['min'] = _df[['min_x','min_y']].min(axis=1)
dfm_53 = _df.drop(['count_x','count_y','min_x','min_y'],axis=1).copy()

# dfm_35 = orf_dict[_keys[k]][0].drop(['count_x','count_y','min_x','min_y'],axis=1).copy()
# dfm_53 = orf_dict[_keys[k]][1].drop(['count_x','count_y','min_x','min_y'],axis=1).copy()


# for k in _keys:
# df = hdf_store.get(k)
# _dfm_35 = df.groupby(['posAsite35','dir','gene']).agg(['count','min'])['gene_len'].reset_index()
# _dfm_53 = df.groupby(['posAsite53','dir','gene']).agg(['count','min'])['gene_len'].reset_index()
# if(dfm_35.shape[0]==0):
# dfm_35 = _dfm_35.copy()
# dfm_53 = _dfm_53.copy()
# else:
# _df = dfm_35.merge(_dfm_35,left_on=['posAsite35','dir','gene'],right_on=['posAsite35','dir','gene'],how='outer')
# _df['count'] = _df[['count_x','count_y']].sum(axis=1).astype('int')
# _df['min'] = _df[['min_x','min_y']].min(axis=1)
# dfm_35 = _df.drop(['count_x','count_y','min_x','min_y'],axis=1).copy()
# _df = dfm_53.merge(_dfm_53,left_on=['posAsite53','dir','gene'],right_on=['posAsite53','dir','gene'],how='outer')
# _df['count'] = _df[['count_x','count_y']].sum(axis=1).astype('int')
# _df['min'] = _df[['min_x','min_y']].min(axis=1)
# dfm_53 = _df.drop(['count_x','count_y','min_x','min_y'],axis=1).copy()

dfm_35['relfreq']=dfm_35['count']/dfm_35['min']
dfm_53['relfreq']=dfm_53['count']/dfm_53['min']
Expand Down Expand Up @@ -699,7 +767,7 @@ class PROJECT_INFO(IntEnum):
STORE = 3
OUTPATH = 4
BASENAME = 5

CPU = 6

#%% the main routine for processing
from tap import Tap # typed-argument-parser
Expand All @@ -709,7 +777,7 @@ class PROJECT_INFO(IntEnum):
if interactive_mode:
#sys.argv = ['script','--sam','demo/filtered_Histag_Standard.sam','--gff', 'reference_files/wt-prspb-amyI-GFF3.gff','--fa',r'reference_files/WT-Prspb-amyI-FASTA.fa','--nc','10','--nb','100']
#sys.argv = ['script','--sam','demo/demo_file.sam','--gff', 'reference_files/wt-prspb-amyI-GFF3.gff','--fa',r'reference_files/WT-Prspb-amyI-FASTA.fa','--nb','10']
sys.argv = ['script','--sam','demo/demo_1.sam','--gff', 'reference_files/wt-prspb-amyI-GFF3.gff','--fa',r'reference_files/WT-Prspb-amyI-FASTA.fa']
sys.argv = ['script','--sam','demo/demo_1.sam','--gff', 'reference_files/wt-prspb-amyI-GFF3.gff','--fa',r'reference_files/WT-Prspb-amyI-FASTA.fa','--orf','--nc','10','--verbose']

class myargs(Tap):

Expand Down Expand Up @@ -778,21 +846,23 @@ class myargs(Tap):
folders = get_hdf_items(output_file_full,"augmented")
nblocks = get_hdf_attribute(output_file_full,"nblocks")

maxcpu = args.nc
maxcpu = min(multiprocessing.cpu_count(),maxcpu)
print('{0} cores/threads detected, running on {1} threads'.format(multiprocessing.cpu_count(),maxcpu))

project_data[PROJECT_INFO.NBLOCKS]=nblocks
project_data[PROJECT_INFO.STORE]=output_file_full
project_data[PROJECT_INFO.OUTPATH]=dir_path
project_data[PROJECT_INFO.BASENAME]=sam_file_name_base.lower()
project_data[PROJECT_INFO.ARGS]=args
project_data[PROJECT_INFO.ARGS]=args
project_data[PROJECT_INFO.CPU]=maxcpu


if not(exists(output_file_full)) or (len(folders)==0) or (args.ow) or (nblocks!=len(folders)):


fasta_str = load_fasta_file(args.fa)
gns = load_gene_defs(fasta_str, args.gff)

maxcpu = args.nc
maxcpu = min(multiprocessing.cpu_count(),maxcpu)
print('{0} cores/threads detected, running on {1} threads'.format(multiprocessing.cpu_count(),maxcpu))
gns = load_gene_defs(fasta_str, args.gff)

concurrency = maxcpu
total_task_num = nr_blocks
Expand Down

0 comments on commit 51a7299

Please sign in to comment.