-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_simulator.py
108 lines (95 loc) · 6.05 KB
/
run_simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import logging
import time
import os
logger = logging.getLogger(f'WFsim_{time.strftime("%Y%m%d-%H%M")}')
logger.setLevel(logging.ERROR)
def save_results(resulting_matrix, output_dir):
import numpy as np
import pandas as pd
from config import ALL_INDICES
from utils import call_subprocess
out_path = os.path.join(output_dir, f"{args.sim_name}_results")
numpy_path = f"{out_path}.npy"
pd_path = f"{out_path}.csv"
np.save(numpy_path, resulting_matrix)
pd.DataFrame(resulting_matrix, columns=ALL_INDICES.keys()).to_csv(pd_path, index=False)
if args.compress:
call_subprocess("gzip", [numpy_path, "--force"])
call_subprocess("gzip", [pd_path, "--force"])
def run_simulator(args):
from lib.simulator import simulator
from config import FASTA_IN, DATA_PATH
## Check input arguments
input_fasta = args.input_fasta if args.input_fasta is not None else FASTA_IN
output_dir = args.outdir if args.outdir is not None else os.path.join(DATA_PATH, "simulation_results", time.strftime("%Y%m%d-%H%M%S"))
os.mkdir(output_dir) if not os.path.exists(output_dir) else None
batch_size = args.batch_size
total_simulations = args.n_simulations * batch_size
if total_simulations == 0:
raise Exception("Batch size or n_simulations cannot be 0")
# set prior parameters
prior_params = {
"Ne": {"distribution": str(args.ne_distribution), "min": args.n_individuals_min, "max": args.n_individuals_max, "mean": args.n_individuals_mean, "std": args.n_individuals_std},
"mutation_rate": {"distribution": str(args.mu_distribution), "min": args.mutation_rate_min, "max": args.mutation_rate_max, "mean": args.mutation_rate_mean, "std": args.mutation_rate_std}
}
## Run simulations
m = simulator(
n_generations=args.n_generations,
n_individuals=args.n_individuals, ## Class randomly generates this
max_mutations=args.max_mutations,
batch_size=args.batch_size,
n_repeats=args.n_simulations,
mutation_rate=args.mutation_rate, ## Class randomly generates this
input_fasta=input_fasta,
work_dir=args.workdir,
outdir=output_dir,
filter_below=args.filter_below,
save_data=args.save_all_data,
add_parameters=True,
prior_parameters=prior_params,
)
##Save simulation results
save_results(m,output_dir)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Run Wright-Fisher simulator for Effective population size and mutation rate parameters')
parser.add_argument('--n_simulations', type=int, help='Number of simulations to run', required=True, default=5)
parser.add_argument('--sim_name', type=str, help='Name of your simulation', required=False, default="simulation")
parser.add_argument('--input_fasta', type=str, help='Input fasta file. If not provided, uses a predefined one', required=False, default=None)
parser.add_argument('--n_generations', type=int, help='Number of generations to simulate', required=False, default=20)
parser.add_argument('--n_individuals', type=int, help='Number of individuals to simulate', required=False, default=None)
parser.add_argument('--mutation_rate', type=float, help='Mutation rate to simulate', required=False, default=None)
parser.add_argument('--max_mutations', type=int, help='Maximum number of mutations to simulate', required=False, default=200)
parser.add_argument('--batch_size', type=int, help='Batch size to simulate', required=False, default=1)
parser.add_argument('--workdir', type=str, help='Work directory to simulate', required=False, default=None)
parser.add_argument('--filter_below', type=float, help='Filter below', required=False, default=None)
parser.add_argument('--outdir', type=str, help='Output directory for simulations', required=False, default=None)
parser.add_argument('--compress', action='store_true', help='Compress the output files', required=False)
parser.add_argument('--save_all_data', action='store_true', help='Save all data regarding the simulations', required=False)
parser.add_argument('--n_individuals_min', type=int, help='Minimum number of individuals to simulate',
required=False, default=10)
parser.add_argument('--n_individuals_mean', type=int, help='Mean of individuals to simulate, required if distribution is normal',
required=False, default=1000)
parser.add_argument('--n_individuals_std', type=int, help='Std of individuals to simulate, required if distribution is normal',
required=False, default=200)
parser.add_argument('--n_individuals_max', type=int, help='Maximum number of individuals to simulate',
required=False, default=1000)
parser.add_argument('--mutation_rate_min', type=float, help='Minimum mutation rate to simulate', required=False,
default=0.0001)
parser.add_argument('--mutation_rate_max', type=float, help='Maximum mutation rate to simulate', required=False,
default=0.1)
parser.add_argument('--mutation_rate_mean', type=float, help='Mean mutation rate to simulate, required if distribution is normal', required=False,
default=0.05)
parser.add_argument('--mutation_rate_std', type=float, help='Std of mutation rate to simulate, required if distribution is normal', required=False,
default=0.1)
parser.add_argument('--mu_distribution', type=str, help='Mutation rate distribution', required=False,
default="uniform")
parser.add_argument('--ne_distribution', type=str, help='Effective population size distribution', required=False,
default="uniform")
args = parser.parse_args()
import timeit
start = timeit.default_timer()
run_simulator(args)
stop = timeit.default_timer()
eplased_time = stop - start
print(f"Time taken: {eplased_time/60} minutes")