Skip to content

Commit

Permalink
Merge pull request #21 from uclahs-cds/yupan-standardize-config-struc…
Browse files Browse the repository at this point in the history
…ture

Standardized Config Structure
  • Loading branch information
yupan-ucla authored Mar 4, 2022
2 parents d4b7483 + 6feaffb commit 0c8d8ca
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 105 deletions.
4 changes: 2 additions & 2 deletions .github/ISSUE_TEMPLATE/issue_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ assignees: ''
**Describe the issue**
A clear and concise description of what the issue is. Please include the following in your issue report along with any explicit errors observed
* Pipeline release version
* Cluster you are using (SGE/Slurm-Dev/Slurm-Test)
* Node type (F2 / F72 / M64 )
* Cluster you are using (Slurm-Dev/Slurm-Test)
* Node type (F2 / F32 / F72 / M64 )
* Submission method (interactive/submission script)
* Actual submission script (python submission script, "nextflow run ...", etc.)
* Sbatch or qsub command and logs if applicable
Expand Down
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

## Overview:
The call-sSV pipeline calls somatic structural variants utilizing [Delly](https://github.com/dellytools/delly). This pipeline requires at least one tumor sample and a matched control sample.
This pipeline is developed using Nextflow, docker and can run either on a single node linux machine or a multi-node HPC cluster (e.g. Slurm, SGE).
This pipeline is developed using Nextflow, docker and can run either on a single node linux machine or a multi-node HPC Slurm cluster.

## How to Run:

Expand Down Expand Up @@ -71,7 +71,6 @@ The input CSV should have each of the input fields listed below as separate colu
| ------- | --------- | ------ | -------------|
| dataset_id | yes | string | Boutros Lab dataset id |
| blcds_registered_dataset | yes | boolean | Affirms if dataset should be registered in the Boutros Lab Data registry. Default value is false. |
| sge_scheduler | yes | boolean | Affirms whether job will be executed on the SGE cluster. Default value is false. |
| input_csv | yes | string | Absolute path to the input CSV file for the pipeline. |
| reference_fasta | yes | path | Absolute path to the reference genome fasta file. The reference genome is used by Delly for structural variant calling. |
| exclusion_file | yes | path | Absolute path to the delly reference genome exclusion file utilized to remove suggested regions for structural variant calling. |
Expand All @@ -80,7 +79,7 @@ The input CSV should have each of the input fields listed below as separate colu
| mad_cutoff | yes | integer | Insert size cutoff, median+s*MAD (deletions only) |
| save_intermediate_files | yes | boolean | Optional parameter to indicate whether intermediate files will be saved. Default value is true. |
| output_dir | yes | path | Absolute path to the directory where the output files to be saved. |
| temp_dir | yes | path | Absolute path to the directory where the nextflow’s intermediate files are saved. |
| work_dir | no | path | Path of working directory for Nextflow. When included in the sample config file, Nextflow intermediate files and logs will be saved to this directory. With ucla_cds, the default is /scratch and should only be changed for testing/development. Changing this directory to /hot or /tmp can lead to high server latency and potential disk space limitations, respectively. |
| verbose | false | boolean | If set to true, the values of input channels will be printed, can be used for debugging|

## Outputs
Expand Down
1 change: 0 additions & 1 deletion pipeline/call-sSV.nf
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ Current Configuration:
- output:
output_dir: "${params.output_dir}"
log_output_dir: "${params.log_output_dir}"
temp_dir: "${params.temp_dir}"
- options:
save_intermediate_files: ${params.save_intermediate_files}
Expand Down
21 changes: 21 additions & 0 deletions pipeline/config/base.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
process {
cpus = { methods.check_max( 1 * task.attempt, 'cpus' ) }

errorStrategy = { task.exitStatus in [143, 137, 104, 134, 139] ? 'retry' : 'finish' }
maxRetries = 1

withLabel:process_low {
cpus = { methods.check_max( 2 * task.attempt, 'cpus' ) }
memory = { methods.check_max( 3.GB * task.attempt, 'memory' ) }
}

withLabel:process_medium {
cpus = { methods.check_max( 6 * task.attempt, 'cpus' ) }
memory = { methods.check_max( 42.GB * task.attempt, 'memory' ) }
}

withLabel:process_high {
cpus = { methods.check_max(12 * task.attempt, 'cpus' ) }
memory = { methods.check_max( 84.GB * task.attempt, 'memory' ) }
}
}
38 changes: 38 additions & 0 deletions pipeline/config/default.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import nextflow.util.SysHelper

docker {
enabled = true
// Pass user's UID/GID and group IDs to Docker
uid_and_gid = "-u \$(id -u):\$(id -g)"
all_group_ids = "\$(for i in `id --real --groups`; do echo -n \"--group-add=\$i \"; done)"

runOptions = "${uid_and_gid} ${all_group_ids}"
}

params {
max_cpus = SysHelper.getAvailCpus()
max_memory = SysHelper.getAvailMemory()

cache_intermediate_pipeline_steps = false
save_intermediate_files = true

ucla_cds = true

blcds_registered_dataset = false

// Pipeline tool versions
delly_version = '0.9.1'
bcftools_version = '1.12'
validate_version = '2.1.5'

// Docker tool versions
docker_image_delly = "blcdsdockerregistry/delly:${params.delly_version}"
docker_image_bcftools = "blcdsdockerregistry/bcftools:${params.bcftools_version}"
docker_image_validate = "blcdsdockerregistry/validate:${params.validate_version}"
}

process {
executor = 'local'
echo = true
cache = true
}
224 changes: 140 additions & 84 deletions pipeline/config/methods.config
Original file line number Diff line number Diff line change
@@ -1,22 +1,4 @@
import nextflow.util.SysHelper

manifest {
name = "call-sSV"
mainScript = "call-sSV.nf"
nextflowVersion = ">=20.07.1"
author = "Yu Pan, Ghouse Mohammed"
homePage = "https://github.com/uclahs-cds/pipeline-call-sSV"
description = "A pipeline to call somatic SVs utilizing Delly"
version = "2.0.0"
}

methods {
set_docker_urls = {
params.docker_image_delly = "blcdsdockerregistry/delly:${params.delly_version}"
params.docker_image_bcftools = "blcdsdockerregistry/bcftools:${params.bcftools_version}"
params.docker_image_validate = "blcdsdockerregistry/validate:${params.validate_version}"
}

check_permissions = { path ->
def filePath = new File(path)

Expand All @@ -42,112 +24,186 @@ methods {
reader.splitEachLine(',') { parts -> [sample = parts[1].split('/')[-1].split('.bam')[0]] }
//reader.splitEachLine(',') { parts -> [sample = parts[1].tokenize('/')[-1].tokenize('.bam').join('.')] }

def date = new Date().format('yyyyMMdd-HHmmss')
if (params.sge_scheduler) {
params.avere_prefix = '/data/data'
} else {
params.avere_prefix = '/hot/data'
}
tz = TimeZone.getTimeZone('UTC')
def date = new Date().format("yyyyMMdd'T'HHmmss'Z'", tz)

params.dataset_registry_prefix = '/hot/data'

if (params.blcds_registered_dataset == true) {
if ("${params.dataset_id.length()}" != 11) {
throw new Exception("Dataset id must be eleven characters long")
}
}
def disease = "${params.dataset_id.substring(0,4)}"
// Need to fill in analyte, technology, raw_od_aligned, genome, pipeline-name
params.log_output_dir = "${params.avere_prefix}/$disease/${params.dataset_id}/${project}/${sample}/analyte/technology,raw_or_aligned/genome/logs/pipeline-name/$date"
params.log_output_dir = "${params.dataset_registry_prefix}/$disease/${params.dataset_id}/${project}/${sample}/analyte/technology,raw_or_aligned/genome/logs/pipeline-name/$date"
params.disease = "${disease}"
} else {
}
else {
params.log_output_dir = "${params.output_dir}/${manifest.name}-${manifest.version}/${sample}/log-${manifest.name}-${manifest.version}-${date}"
params.disease = null
}
}

params.sample = "${sample}"
params.date = "${date}"
}
}

set_output_dir = {
params.output_dir = "${params.output_dir}/${manifest.name}-${manifest.version}/${params.sample}/${params.docker_image_delly.split("/")[1].replace(':', '-').toUpperCase()}"
}

// Process specific scope
set_process = {
// monitor process jobs with local (not slurm) executor
process.executor = "local"
// echo stdout of each step to stdout of pipeline
process.echo = true
process.cache = params.cache_intermediate_pipeline_steps
// Function to ensure that resource requirements don't go beyond
// a maximum limit
check_max = { obj, type ->
if (type == 'memory') {
try {
if (obj.compareTo(params.max_memory as nextflow.util.MemoryUnit) == 1)
return params.max_memory as nextflow.util.MemoryUnit
else
return obj
}
catch (all) {
println " ### ERROR ### Max memory '${params.max_memory}' is not valid! Using default value: $obj"
return obj
}
}
else if (type == 'time') {
try {
if (obj.compareTo(params.max_time as nextflow.util.Duration) == 1)
return params.max_time as nextflow.util.Duration
else
return obj
}
catch (all) {
println " ### ERROR ### Max time '${params.max_time}' is not valid! Using default value: $obj"
return obj
}
}
else if (type == 'cpus') {
try {
return Math.min(obj, params.max_cpus as int)
}
catch (all) {
println " ### ERROR ### Max cpus '${params.max_cpus}' is not valid! Using default value: $obj"
return obj
}
}
}

set_resources_allocation = {
// Function to ensure that resource requirements don't go beyond
// a maximum limit
node_cpus = params.max_cpus
node_memory_GB = params.max_memory.toGiga()
// Load base.config by default for all pipelines
includeConfig "${projectDir}/config/base.config"
if (params.ucla_cds) {
if (node_cpus == 64) {
// Check memory for M64 node
if (node_cpus == 64 && node_memory_GB >= 950 && node_memory_GB <= 1010) {
includeConfig "${projectDir}/config/M64.config"
}
else {
throw new Exception(" ### ERROR ### System resources not as expected (cpus=${node_cpus} memory=${node_memory_GB}), unable to assign resources.")
}
}
else {
// Check memory for F series node
if (node_memory_GB >= (node_cpus * 2 * 0.9) && node_memory_GB <= (node_cpus * 2)) {
includeConfig "${projectDir}/config/F${node_cpus}.config"
}
else {
throw new Exception(" ### ERROR ### System resources not as expected (cpus=${node_cpus} memory=${node_memory_GB}), unable to assign resources.")
}
}
}
}


/**
* Check the permissions and existence of workDir.
* If it doesn't exist, recursively find first existing directory and check write permission.
* If it exists, check write permission.
*/
check_workdir_permissions = { dir ->
dir_file = new File(dir)
if (dir_file.exists()) {
if (dir_file.canWrite()) {
return true
}
else {
throw new Exception(" ### ERROR ### The input directory params.work_dir: ${dir} is not writeable. Please verify and try again.")
}
}
else {
while (!dir_file.exists()) {
dir_file = dir_file.getParentFile()
}

if (dir_file.canWrite()) {
return true
}
else {
throw new Exception(" ### ERROR ### The input directory params.work_dir: ${dir} cannot be created. The closest existing parent directory ${dir_file.toString()} is not writable. Please verify permissions or change the input parameter.")
}
}
}

// Location of Nextflow temp directories
set_env = {
workDir = params.temp_dir
if (params.ucla_cds) {
/**
* By default, if the /scratch directory exists, set it as the Nextflow working directory
* If config file specified work_dir, set it as the Nextflow working directory
*
* WARNING: changing this directory can lead to high server latency and
* potential disk space limitations. Change with caution! The 'workDir'
* in Nextflow determines the location of intermediate and temporary files.
*/
params.work_dir = (params.containsKey("work_dir") && params.work_dir) ? params.work_dir : "/scratch"
if (methods.check_workdir_permissions(params.work_dir)) {
workDir = params.work_dir
}
}
else {
// If work_dir was specified as a param and exists or can be created, set workDir. Otherwise, let Nextflow's default behavior dictate workDir
if (params.containsKey("work_dir") && params.work_dir && methods.check_workdir_permissions(params.work_dir)) {
workDir = params.work_dir
}
}
}

// Pipeline monitoring and metric files
set_timeline = {
timeline.enabled = true
timeline.file = "${params.log_output_dir}/nextflow-log/timeline.html"
}

set_trace = {
set_pipeline_logs = {
trace.enabled = true
trace.file = "${params.log_output_dir}/nextflow-log/trace.txt"
}

set_report = {
timeline.enabled = true
timeline.file = "${params.log_output_dir}/nextflow-log/timeline.html"

report.enabled = true
report.file = "${params.log_output_dir}/nextflow-log/report.html"
}
}

set_node_config = {
def node_cpus = SysHelper.getAvailCpus()
def node_mem = SysHelper.getAvailMemory().getGiga()
set_process = {
process.cache = params.cache_intermediate_pipeline_steps
}

if (node_cpus == 2 && node_mem >= 3 && node_mem < 5) {
includeConfig "${projectDir}/config/F2.config"
}
else if (node_cpus == 32 && node_mem >= 60 && node_mem < 70) {
includeConfig "${projectDir}/config/F32.config"
}
else if (node_cpus == 72 && node_mem >= 130 && node_mem < 150) {
includeConfig "${projectDir}/config/F72.config"
}
else if (node_cpus == 64 && node_mem >= 900 && node_mem < 1100) {
includeConfig "${projectDir}/config/M64.config"
}
else {
throw new Exception('ERROR: System resources not as expected, unable to assign resources.')
}
set_docker_sudo = {
if (params.containsKey("blcds_cluster_slurm") && (!params.blcds_cluster_slurm)) {
docker.sudo = true
}
}

// Set up env, timeline, trace, and report above.
setup = {
methods.set_docker_urls()
methods.set_log_output_dir()
methods.set_output_dir()
methods.check_permissions(params.log_output_dir)

methods.set_env()
methods.set_resources_allocation()
methods.set_process()
methods.set_timeline()
methods.set_trace()
methods.set_report()
methods.set_node_config()
methods.set_docker_sudo()
methods.set_pipeline_logs()
}
}


methods.setup()

// Enable docker
docker {
enabled = true
sudo = (params.sge_scheduler) ? true : false // Set to true if run on SGE

// Pass user's UID/GID and group IDs to Docker
uid_and_gid = "-u \$(id -u):\$(id -g)"
all_group_ids = "\$(for i in `id --real --groups`; do echo -n \"--group-add=\$i \"; done)"

runOptions = "${uid_and_gid} ${all_group_ids}"
}
Loading

0 comments on commit 0c8d8ca

Please sign in to comment.