Skip to content

Commit

Permalink
flake8
Browse files Browse the repository at this point in the history
  • Loading branch information
esseivaju committed May 26, 2022
1 parent d2af3f6 commit 6a59e85
Showing 1 changed file with 37 additions and 23 deletions.
60 changes: 37 additions & 23 deletions src/raythena/drivers/esdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
from math import ceil
from queue import Empty, Queue
from socket import gethostname
from typing import Any, Dict, Iterator, List, Set, Tuple, Union
from typing import Any, Dict, Iterator, List, Set, Tuple, Union, Iterable
from subprocess import DEVNULL, Popen


import ray
from ray.exceptions import RayActorError
from raythena.actors.esworker import ESWorker
Expand Down Expand Up @@ -214,7 +213,7 @@ def fetch_event_ranges(self, actor_id: str, n: int) -> List[EventRange]:
return ranges

def process_event_ranges_update(
self, actor_id: str, event_ranges_update: Union[dict, EventRangeUpdate]
self, actor_id: str, event_ranges_update: Union[dict, EventRangeUpdate]
) -> Union[Tuple[EventRangeUpdate, EventRangeUpdate], None]:
"""
Update the event ranges status according to the range update.
Expand Down Expand Up @@ -368,7 +367,8 @@ def __init__(self, config: Config, session_dir: str) -> None:
# TODO removing stdout on the root logger will also disable ray logging and collected stdout from actors
disable_stdout_logging()

self._logger.debug(f"Raythena initializing, version {__version__} running Ray {ray.__version__} on {gethostname()}")
self._logger.debug(
f"Raythena initializing, version {__version__} running Ray {ray.__version__} on {gethostname()}")

# self.cpu_monitor = CPUMonitor(os.path.join(workdir, "cpu_monitor_driver.json"))
# self.cpu_monitor.start()
Expand Down Expand Up @@ -475,7 +475,8 @@ def create_actors(self) -> None:
event_ranges = self.bookKeeper.fetch_event_ranges(actor_id, events_per_actor)
if event_ranges:
kwargs['event_ranges'] = event_ranges
self._logger.debug(f"Prefetched job {job['PandaID']} and {len(event_ranges)} event ranges for {actor_id}")
self._logger.debug(
f"Prefetched job {job['PandaID']} and {len(event_ranges)} event ranges for {actor_id}")

actor = ESWorker.options(resources={node_constraint: 1}).remote(**kwargs)
self.actors[actor_id] = actor
Expand Down Expand Up @@ -852,7 +853,7 @@ def create_tar_file(self, range_list: list) -> Dict[str, List[Dict]]:
try:
# create tar file looping over event ranges
return_code = self.hits_merge_transform(map(lambda x: x['path'], range_list), file_path)
if return_code != 0:
if return_code != 0:
raise Exception(f"Merged transform failed to execute with return code {return_code}")
file_fsize = os.path.getsize(file_path)
# calculate alder32 checksum
Expand Down Expand Up @@ -888,7 +889,8 @@ def calc_adler32(self, file_name: str) -> str:
val += 2 ** 32
return hex(val)[2:10].zfill(8).lower()

def create_harvester_data(self, PanDA_id: str, file_path: str, file_chksum: str, file_fsize: int, range_list: list) -> Dict[str, List[Dict]]:
def create_harvester_data(self, PanDA_id: str, file_path: str, file_chksum: str, file_fsize: int,
range_list: list) -> Dict[str, List[Dict]]:
"""
create data structure for telling Harvester what files are merged and ready to process
Expand Down Expand Up @@ -916,8 +918,17 @@ def create_harvester_data(self, PanDA_id: str, file_path: str, file_chksum: str,
if return_list:
return_dict = {PanDA_id: return_list}
return return_dict

def hits_merge_transform(self, input_files: List[str], output_file):

def hits_merge_transform(self, input_files: Iterable[str], output_file: str) -> int:
"""
Args:
input_files:
output_file:
Returns:
int:
"""
if not input_files:
return
tmp_dir = tempfile.mkdtemp()
Expand All @@ -928,23 +939,22 @@ def hits_merge_transform(self, input_files: List[str], output_file):
cmd = str()
cmd += "export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;"
cmd += "export thePlatform=\"${SLURM_SPANK_SHIFTER_IMAGEREQUEST}\";"
cmd += f"source ${{ATLAS_LOCAL_ROOT_BASE}}/user/atlasLocalSetup.sh --swtype shifter -c $thePlatform -d -s none -r \"{container_script}\" -e \"--clearenv\";RETURN_VAL=$?; rm -r {tmp_dir};exit $RETURN_VAL;"
cmd += "source ${{ATLAS_LOCAL_ROOT_BASE}}/user/atlasLocalSetup.sh --swtype shifter -c $thePlatform -d -s none"
cmd += f" -r \"{container_script}\" -e \"--clearenv\";RETURN_VAL=$?; rm -r {tmp_dir};exit $RETURN_VAL;"
sub_process = Popen(cmd,
stdin=DEVNULL,
stdout=DEVNULL,
stderr=DEVNULL,
shell=True,
cwd=tmp_dir,
close_fds=True)
stdin=DEVNULL,
stdout=DEVNULL,
stderr=DEVNULL,
shell=True,
cwd=tmp_dir,
close_fds=True)

while sub_process.poll() is None:
time.sleep(5)
self._logger.debug(f"Merge transform finished with return code {sub_process.poll()}")
return sub_process.returncode



def tar_es_output(self, skip_time_check = False) -> None:
def tar_es_output(self, skip_time_check=False) -> None:
"""
Get from bookKeeper the event ranges arraigned by input file than need to put into output tar files
Expand All @@ -966,14 +976,17 @@ def tar_es_output(self, skip_time_check = False) -> None:
self.ranges_to_tar.extend(ranges_to_tar)

try:
self.running_tar_threads.update({self.tar_executor.submit(self.create_tar_file, range_list): range_list for range_list in self.ranges_to_tar})
self.running_tar_threads.update(
{self.tar_executor.submit(self.create_tar_file, range_list): range_list for range_list in
self.ranges_to_tar})
self.total_tar_tasks += len(self.ranges_to_tar)
self.ranges_to_tar = list()
except Exception as exc:
self._logger.warn(f"tar_es_output: Exception {exc} when submitting tar subprocess")
pass

self._logger.debug(f"tar_es_output: #tasks in queue : {len(self.running_tar_threads)}, #total tasks submitted since launch: {self.total_tar_tasks}")
self._logger.debug(
f"tar_es_output: #tasks in queue : {len(self.running_tar_threads)}, #total tasks submitted since launch: {self.total_tar_tasks}")

def get_tar_results(self) -> None:
"""
Expand All @@ -986,7 +999,8 @@ def get_tar_results(self) -> None:
"""
if len(self.running_tar_threads) == 0:
return
done, not_done = concurrent.futures.wait(self.running_tar_threads, timeout=0.001, return_when=concurrent.futures.FIRST_COMPLETED)
done, not_done = concurrent.futures.wait(self.running_tar_threads, timeout=0.001,
return_when=concurrent.futures.FIRST_COMPLETED)
final_update = EventRangeUpdate()
for future in done:
try:
Expand Down

0 comments on commit 6a59e85

Please sign in to comment.