-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSnakefile.checksummer
executable file
·151 lines (117 loc) · 4.67 KB
/
Snakefile.checksummer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/bin/bash
# vim: ft=python
# This workflow expects to be run in an output directory and will
# produce {foo}_md5sums.txt by checksumming all the regular files
# in config['input_dir']. As with rsync, if there is a /./ in the input
# path it will be used to split the input into a base path and a prefix
# that the md5sums.txt will be relative to.
#
# A checkpoint rule will be used to batch the files and save the output
# into {foo}_batches.json.
"""true" ### Begin shell script part
set -u
source "`dirname $0`"/shell_helper_functions.sh
snakerun_drmaa "$0" "$@"
"exit""" ### End of shell script part
#!/usr/bin/env snakemake
import json
from pprint import pprint, pformat
def split_input_dir(idir=config['input_dir']):
"""Return config['input_dir'], split at /./ if there is one.
"""
idir = idir.rstrip("/")
path_bits = idir.split("/")
path_bits.reverse()
try:
dot_pos = path_bits.index(".")
except ValueError:
dot_pos = None
if not dot_pos:
# No /./ in the path, or it's right at the end
return (idir, "")
# Re-reverse list in-place!
return ( "/".join(path_bits[:dot_pos:-1]),
"/".join(path_bits[dot_pos-1::-1]) )
def scan_for_batches(base_p, prefix_p, batch_size=config.get('batch_size', 100)):
"""The main part of the whole thing. Find all the files in the
input directory and return them as a dict of lists of size 100.
The order and content of the lists should be stable as long as the
file names and batch size are the same.
The list sizes should be balanced, so if there are 101 files we should
get 51+50 not 100+1.
The files in the lists should be mixed up in an attempt to get a range of sizes
per list. Filling the lists in a round-robin manner satisfies this and the
previous criterion.
"""
# 1 - Get a list of all files with paths starting prefix_p/
# 2 - Make sure it's sorted
# 3 - call batchlist
res = []
for dirpath, dirnames, filenames in os.walk(os.path.join(base_p, prefix_p)):
assert dirpath[:len(base_p)] == base_p
reldirpath = dirpath[len(base_p):]
if base_p:
reldirpath = reldirpath.lstrip('/')
res.extend([os.path.join(reldirpath, f) for f in filenames])
res.sort()
return batchlist(res, batch_size, 3)
def batchlist(l, batch_size, min_pad=0):
"""Helper function for the above
"""
# 1 - Work out the keys and make a base dict (use 00 01 02, or
# 00001 00002 00003 depending on how many batches)
# 2 - Fill the list
# 3 - profit
batches_needed = ( (len(l) - 1) // batch_size ) + 1
# Round robin fill (lol = list of lists)
lol = [ [] for __ in range(batches_needed) ]
for i, x in enumerate(l):
lol[i % batches_needed].append(x)
# Return a dict rather than a list so we can pad the keys
pad_size = max(len(str(batches_needed - 1)), min_pad)
return { f"{k:0{pad_size}d}": v for k, v in enumerate(lol) }
## End of functions ## Leave this comment in place to help the unit tests.
# Determine the output prefix.
BASE_PATH, PREFIX_PATH = split_input_dir()
if 'output_prefix' in config:
# This only affects the name of the md5 file
if config['output_prefix']:
OUTPUT_PREFIX = config['output_prefix'] + "_"
else:
OUTPUT_PREFIX = ""
else:
OUTPUT_PREFIX = PREFIX_PATH and (PREFIX_PATH.replace('/','_') + "_")
localrules: main, gen_batches, combine_batches
rule main:
input: f"{OUTPUT_PREFIX}md5sums.txt"
checkpoint gen_batches:
output:
json = f"{OUTPUT_PREFIX}batches.json"
run:
batches = scan_for_batches(BASE_PATH, PREFIX_PATH)
with open(str(output.json), "w") as jfh:
json.dump(batches, jfh)
rule md5sum_batch:
output:
batch = temp(f"{OUTPUT_PREFIX}md5sums_{{b}}.txt")
input:
json = f"{OUTPUT_PREFIX}batches.json"
run:
logger.quiet.discard('all')
with open(str(input.json)) as bffh:
batches = json.load(bffh)
my_batch = batches[wildcards.b]
shell("(cd {BASE_PATH:q} ; md5sum {my_batch:q}) > {output.batch}")
def i_combine_batches(wildcards=None):
batches_file = checkpoints.gen_batches.get().output.json
with open(batches_file) as bffh:
batches = json.load(bffh)
# One file per batch, names based on the dict keys and always combined
# in order, although I have to sort the result anyway so maybe that's
# pointless.
return [ f"{OUTPUT_PREFIX}md5sums_{b}.txt" for b in sorted(batches) ]
rule combine_batches:
output: f"{OUTPUT_PREFIX}md5sums.txt"
input: i_combine_batches
shell:
"sort -k2 {input} > {output}.part ; mv {output}.part {output}"