Skip to content

Commit

Permalink
Put parallel loop in
Browse files Browse the repository at this point in the history
  • Loading branch information
znicholls committed Jul 10, 2019
1 parent 597b386 commit 9a05a26
Showing 1 changed file with 30 additions and 21 deletions.
51 changes: 30 additions & 21 deletions src/netcdf_scm/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import os.path
import re
import sys
from concurrent.futures import ThreadPoolExecutor
from os import makedirs, walk
from time import gmtime, strftime
from time import gmtime, sleep, strftime

import click
import numpy as np
Expand Down Expand Up @@ -195,26 +196,34 @@ def crunch_files(fnames, dpath):

def _apply_func_to_files_if_dir_matches_regexp(apply_func, search_dir, regexp_to_match):
failures = False

logger.info("Finding directories with files")
total_dirs = len(list([f for _, _, f in walk(search_dir) if f]))
logger.info("Found %s directories with files", total_dirs)
dir_counter = 1
for dirpath, _, filenames in walk(search_dir):
logger.debug("Entering %s", dirpath)
if filenames:
logger.info("Checking directory %s of %s", dir_counter, total_dirs)
dir_counter += 1
if not regexp_to_match.match(dirpath):
logger.debug("Skipping (did not match regexp) %s", dirpath)
continue
logger.info("Attempting to process: %s", filenames)
try:
apply_func(filenames, dirpath)

except Exception: # pylint:disable=broad-except
logger.exception("Failed to process: %s", filenames)
failures = True
futures = []
with ThreadPoolExecutor(max_workers=20) as pool:
for dirpath, _, filenames in walk(search_dir):
logger.debug("Entering %s", dirpath)
if filenames:
if not regexp_to_match.match(dirpath):
logger.debug("Skipping (did not match regexp) %s", dirpath)
continue
logger.info("Attempting to process: %s", filenames)
futures.append(pool.submit(apply_func, filenames, dirpath))
sleep(1)

# could do progress bar in future
# kwargs = {
# 'total': len(futures),
# 'unit': 'it',
# 'unit_scale': True,
# 'leave': True
# }
# # Print out the progress as tasks complete
# for f in tqdm(as_completed(futures), **kwargs):
# pass

for future in futures:
try:
future.result()
except Exception as e:
failures = True

return failures

Expand Down

0 comments on commit 9a05a26

Please sign in to comment.