Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Downloader utility class to use static dask cluster #1161

Merged
merged 23 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
08e2996
update downloader to be static variable
efajardo-nv Aug 29, 2023
e859592
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Aug 29, 2023
5bd447e
Merge branch 'branch-23.11' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Aug 29, 2023
9e56ddd
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Aug 30, 2023
1ecd371
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Aug 30, 2023
2b79fd6
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Sep 1, 2023
ff36997
Merge branch 'branch-23.11' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Sep 11, 2023
46bbe5b
make dask cluster static instead per feedback
efajardo-nv Sep 11, 2023
a98b0ea
Merge branch 'file-to-df-loader-fix' of https://github.com/efajardo-n…
efajardo-nv Sep 11, 2023
e40af22
get_dask_cluster update
efajardo-nv Sep 11, 2023
45d9d8e
Merge branch 'branch-23.11' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Sep 12, 2023
53b967b
update downloader unit tests
efajardo-nv Sep 12, 2023
54fc756
add comment
efajardo-nv Sep 12, 2023
18add2e
update test_dfp_file_to_df
efajardo-nv Sep 12, 2023
52f3703
fix module dfp cmd example
efajardo-nv Sep 13, 2023
c485ded
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Sep 13, 2023
44c1f4c
create second downloader in test_get_dask_cluster
efajardo-nv Sep 13, 2023
ed8cfc4
Merge branch 'branch-23.11' of https://github.com/nv-morpheus/Morpheu…
efajardo-nv Sep 18, 2023
e874961
use mutex when getting dask cluster
efajardo-nv Sep 18, 2023
4fa8d12
mutex update
efajardo-nv Sep 18, 2023
9cb2eb8
style fixes
efajardo-nv Sep 18, 2023
f08b27a
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Sep 19, 2023
356d79a
Merge branch 'branch-23.11' into file-to-df-loader-fix
efajardo-nv Sep 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ To run the DFP pipelines with the example datasets within the container, run:
--start_time "2022-08-01" \
--duration "60d" \
--train_users generic \
--input_file "./control_messages/duo_payload_load_training_inference.json"
--input_file "./control_messages/duo_payload_load_train_inference.json"
```

* Azure Training Pipeline
Expand Down
109 changes: 2 additions & 107 deletions morpheus/loaders/file_to_df_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ def file_to_df_loader(control_message: ControlMessage, task: dict):
parser_kwargs = config.get("parser_kwargs", None)
cache_dir = config.get("cache_dir", None)

if not hasattr(file_to_df_loader, "downloader"):
file_to_df_loader.downloader = Downloader()

if (cache_dir is None):
cache_dir = "./.cache"
logger.warning("Cache directory not set. Defaulting to ./.cache")
Expand All @@ -107,109 +104,7 @@ def file_to_df_loader(control_message: ControlMessage, task: dict):
# Overwriting payload with derived data
control_message.payload(MessageMeta(df))

# Optimistaclly prep the dataframe (Not necessary since this will happen again in process_dataframe, but it
# increases performance significantly)
if (schema.prep_dataframe is not None):
s3_df = schema.prep_dataframe(s3_df)

return s3_df

def get_or_create_dataframe_from_s3_batch(file_name_batch: typing.List[str]) -> typing.Tuple[cudf.DataFrame, bool]:

if (not file_name_batch):
raise RuntimeError("No file objects to process")

file_list = fsspec.open_files(file_name_batch)
# batch_count = file_name_batch[1]

file_system: fsspec.AbstractFileSystem = file_list.fs

# Create a list of dictionaries that only contains the information we are interested in hashing. `ukey` just
# hashes all the output of `info()` which is perfect
hash_data = [{"ukey": file_system.ukey(file_object.path)} for file_object in file_list]

# Convert to base 64 encoding to remove - values
objects_hash_hex = hashlib.md5(json.dumps(hash_data, sort_keys=True).encode()).hexdigest()

batch_cache_location = os.path.join(cache_dir, "batches", f"{objects_hash_hex}.pkl")

# Return the cache if it exists
if (os.path.exists(batch_cache_location)):
output_df = pd.read_pickle(batch_cache_location)
output_df["origin_hash"] = objects_hash_hex
# output_df["batch_count"] = batch_count

return (output_df, True)

# Cache miss
download_method_func = partial(single_object_to_dataframe,
file_type=file_type,
filter_null=filter_null,
parser_kwargs=parser_kwargs)

download_buckets = file_list

# Loop over dataframes and concat into one
try:
dfs = file_to_df_loader.downloader.download(download_buckets, download_method_func)
except Exception:
logger.exception("Failed to download logs. Error: ", exc_info=True)
raise

if (dfs is None or len(dfs) == 0):
raise ValueError("No logs were downloaded")

output_df: pd.DataFrame = pd.concat(dfs)
output_df = process_dataframe(df_in=output_df, input_schema=schema)

# Finally sort by timestamp and then reset the index
output_df.sort_values(by=[timestamp_column_name], inplace=True)

output_df.reset_index(drop=True, inplace=True)

# Save dataframe to cache future runs
os.makedirs(os.path.dirname(batch_cache_location), exist_ok=True)

try:
output_df.to_pickle(batch_cache_location)
except Exception:
logger.warning("Failed to save batch cache. Skipping cache for this batch.", exc_info=True)

# output_df["batch_count"] = batch_count
output_df["origin_hash"] = objects_hash_hex

return (output_df, False)

def convert_to_dataframe(filenames: typing.List[str]):

if (not filenames):
return None

start_time = time.time()

try:

output_df, cache_hit = get_or_create_dataframe_from_s3_batch(filenames)

duration = (time.time() - start_time) * 1000.0

if (output_df is not None and logger.isEnabledFor(logging.DEBUG)):
logger.debug("S3 objects to DF complete. Rows: %s, Cache: %s, Duration: %s ms, Rate: %s rows/s",
len(output_df),
"hit" if cache_hit else "miss",
duration,
len(output_df) / (duration / 1000.0))

return output_df
except Exception:
logger.exception("Error while converting S3 buckets to DF.")
raise

pdf = convert_to_dataframe(files)

df = cudf.from_pandas(pdf)

# Overwriting payload with derived data
control_message.payload(MessageMeta(df))
finally:
controller.close()

return control_message
19 changes: 10 additions & 9 deletions morpheus/utils/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ class Downloader:
The heartbeat interval to use when using dask or dask_thread.
"""

_dask_cluster = None

def __init__(self,
download_method: typing.Union[DownloadMethods, str] = DownloadMethods.DASK_THREAD,
dask_heartbeat_interval: str = "30s"):

self._merlin_distributed = None
self._dask_cluster = None
self._dask_heartbeat_interval = dask_heartbeat_interval

download_method = os.environ.get("MORPHEUS_FILE_DOWNLOAD_TYPE", download_method)
Expand Down Expand Up @@ -96,23 +97,20 @@ def get_dask_cluster(self):
dask_cuda.LocalCUDACluster
"""

if self._dask_cluster is None:
import dask
import dask.distributed
if Downloader._dask_cluster is None:
import dask_cuda.utils

logger.debug("Creating dask cluster...")

# Up the heartbeat interval which can get violated with long download times
dask.config.set({"distributed.client.heartbeat": self._dask_heartbeat_interval})
n_workers = dask_cuda.utils.get_n_gpus()
threads_per_worker = mp.cpu_count() // n_workers

self._dask_cluster = dask_cuda.LocalCUDACluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
Downloader._dask_cluster = dask_cuda.LocalCUDACluster(n_workers=n_workers,
threads_per_worker=threads_per_worker)

logger.debug("Creating dask cluster... Done. Dashboard: %s", self._dask_cluster.dashboard_link)
logger.debug("Creating dask cluster... Done. Dashboard: %s", Downloader._dask_cluster.dashboard_link)

return self._dask_cluster
return Downloader._dask_cluster

def get_dask_client(self):
"""
Expand All @@ -124,6 +122,9 @@ def get_dask_client(self):
"""
import dask.distributed

# Up the heartbeat interval which can get violated with long download times
dask.config.set({"distributed.client.heartbeat": self._dask_heartbeat_interval})

if (self._merlin_distributed is None):
self._merlin_distributed = Distributed(client=dask.distributed.Client(self.get_dask_cluster()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pandas as pd
import pytest

import morpheus.utils.downloader
from _utils import TEST_DIRS
from _utils.dataset_manager import DatasetManager
from morpheus.common import FileTypes
Expand Down Expand Up @@ -99,9 +100,11 @@ def test_constructor(config: Config):


# pylint: disable=redefined-outer-name
@pytest.mark.reload_modules(morpheus.utils.downloader)
@pytest.mark.usefixtures("restore_environ")
@pytest.mark.parametrize('dl_type', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"])
@pytest.mark.parametrize('use_convert_to_dataframe', [True, False])
@pytest.mark.usefixtures("reload_modules")
@mock.patch('multiprocessing.get_context')
@mock.patch('dask.distributed.Client')
@mock.patch('dask_cuda.LocalCUDACluster')
Expand Down
23 changes: 13 additions & 10 deletions tests/test_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import fsspec
import pytest

import morpheus.utils.downloader
from _utils import TEST_DIRS
from _utils import import_or_skip
from morpheus.utils.downloader import DOWNLOAD_METHODS_MAP
Expand Down Expand Up @@ -87,29 +88,30 @@ def test_constructor_invalid_dltype(use_env: bool):
Downloader(**kwargs)


@pytest.mark.usefixtures("restore_environ")
@pytest.mark.reload_modules(morpheus.utils.downloader)
@pytest.mark.parametrize("dl_method", ["dask", "dask_thread"])
@mock.patch('dask.config')
@pytest.mark.usefixtures("reload_modules")
@mock.patch('dask_cuda.LocalCUDACluster')
def test_get_dask_cluster(mock_dask_cluster: mock.MagicMock, mock_dask_config: mock.MagicMock, dl_method: str):
def test_get_dask_cluster(mock_dask_cluster: mock.MagicMock, dl_method: str):
mock_dask_cluster.return_value = mock_dask_cluster
downloader = Downloader(download_method=dl_method)
assert downloader.get_dask_cluster() is mock_dask_cluster

mock_dask_config.set.assert_called_once()
# call again then assert that cluster was only created once
downloader.get_dask_cluster()

mock_dask_cluster.assert_called_once()


@mock.patch('dask.config')
@mock.patch('dask_cuda.LocalCUDACluster')
@pytest.mark.reload_modules(morpheus.utils.downloader)
@pytest.mark.parametrize('dl_method', ["dask", "dask_thread"])
def test_close(mock_dask_cluster: mock.MagicMock, mock_dask_config: mock.MagicMock, dl_method: str):
@pytest.mark.usefixtures("reload_modules")
@mock.patch('dask_cuda.LocalCUDACluster')
def test_close(mock_dask_cluster: mock.MagicMock, dl_method: str):
mock_dask_cluster.return_value = mock_dask_cluster
downloader = Downloader(download_method=dl_method)
assert downloader.get_dask_cluster() is mock_dask_cluster

mock_dask_config.set.assert_called_once()

mock_dask_cluster.close.assert_not_called()
downloader.close()

Expand All @@ -127,7 +129,8 @@ def test_close_noop(mock_dask_cluster: mock.MagicMock, dl_method: str):
mock_dask_cluster.close.assert_not_called()


@pytest.mark.usefixtures("restore_environ")
@pytest.mark.reload_modules(morpheus.utils.downloader)
@pytest.mark.usefixtures("reload_modules", "restore_environ")
@pytest.mark.parametrize('dl_method', ["single_thread", "multiprocess", "multiprocessing", "dask", "dask_thread"])
@mock.patch('multiprocessing.get_context')
@mock.patch('dask.config')
Expand Down