Skip to content

Commit

Permalink
Run Directory Uploader (#101)
Browse files Browse the repository at this point in the history
Run Directory Uploader

Added uploading of the run directory to various cloud providers via a callback. Depends on the LibCloud plugin. Did not use s3 as azure blob store is not s3-compatible.

Closes #98.
  • Loading branch information
ravi-mosaicml authored Dec 3, 2021
1 parent 67160d4 commit 71347a6
Show file tree
Hide file tree
Showing 9 changed files with 460 additions and 9 deletions.
2 changes: 2 additions & 0 deletions composer/callbacks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
from composer.callbacks.callback_hparams import GradMonitorHparams as GradMonitorHparams
from composer.callbacks.callback_hparams import LRMonitorHparams as LRMonitorHparams
from composer.callbacks.callback_hparams import MemoryMonitorHparams as MemoryMonitorHparams
from composer.callbacks.callback_hparams import RunDirectoryUploaderHparams as RunDirectoryUploaderHparams
from composer.callbacks.callback_hparams import SpeedMonitorHparams as SpeedMonitorHparams
from composer.callbacks.callback_hparams import TorchProfilerHparams as TorchProfilerHparams
from composer.callbacks.lr_monitor import LRMonitor as LRMonitor
from composer.callbacks.run_directory_uploader import RunDirectoryUploader as RunDirectoryUploader
from composer.callbacks.speed_monitor import SpeedMonitor as SpeedMonitor
from composer.callbacks.torch_profiler import TorchProfiler as TorchProfiler
63 changes: 62 additions & 1 deletion composer/callbacks/callback_hparams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from __future__ import annotations

import abc
import textwrap
from dataclasses import asdict, dataclass
from typing import TYPE_CHECKING, List
from typing import TYPE_CHECKING, Any, Dict, List, Optional

import yahp as hp

Expand All @@ -16,6 +17,7 @@
from composer.callbacks.grad_monitor import GradMonitor
from composer.callbacks.lr_monitor import LRMonitor
from composer.callbacks.memory_monitor import MemoryMonitor
from composer.callbacks.run_directory_uploader import RunDirectoryUploader
from composer.callbacks.speed_monitor import SpeedMonitor
from composer.callbacks.torch_profiler import TorchProfiler

Expand Down Expand Up @@ -153,3 +155,62 @@ class TorchProfilerHparams(CallbackHparams):
def initialize_object(self) -> TorchProfiler:
from composer.callbacks.torch_profiler import TorchProfiler
return TorchProfiler(**asdict(self))


@dataclass
class RunDirectoryUploaderHparams(CallbackHparams):
""":class:`~composer.callbacks.torch_profiler.RunDirectoryUploader` hyperparameters.
See :class:`~composer.callbacks.torch_profiler.RunDirectoryUploader` for documentation.
"""

provider: str = hp.required("Cloud provider to use.")
container: str = hp.required("The name of the container (i.e. bucket) to use.")
object_name_prefix: Optional[str] = hp.optional(textwrap.dedent("""A prefix to prepend to all object keys.
An object's key is this prefix combined with its path relative to the run directory.
If the container prefix is non-empty, a trailing slash ('/') will
be added if necessary. If not specified, then the prefix defaults to the run directory. To disable prefixing,
set to the empty string."""),
default=None)
key: Optional[str] = hp.optional(textwrap.dedent(
"""API key or username to use to connect to the provider. For security. do NOT hardcode the key in the YAML.
Instead, please specify via CLI arguments, or even better, environment variables."""),
default=None)
secret: Optional[str] = hp.optional(textwrap.dedent(
"""API secret to use to connect to the provider. For security. do NOT hardcode the key in the YAML.
Instead, please specify via CLI arguments, or even better, environment variables."""),
default=None)
region: Optional[str] = hp.optional("Cloud region to use", default=None)
host: Optional[str] = hp.optional("Override hostname for connections", default=None)
port: Optional[int] = hp.optional("Override port for connections", default=None)
num_concurrent_uploads: int = hp.optional("Maximum number of concurrent uploads. Defaults to 4.", default=4)
use_procs: bool = hp.optional(
"Whether to perform file uploads in background processes (as opposed to threads). Defaults to True.",
default=True)
upload_staging_folder: Optional[str] = hp.optional(
"Staging folder for uploads. If not specified, will use a temporary directory.", default=None)
extra_init_kwargs: Dict[str, Any] = hp.optional(
"Extra keyword arguments to pass into the constructor for the specified provider.", default_factory=dict)
upload_every_n_batches: int = hp.optional(
textwrap.dedent("""Interval at which to scan the run directory for changes and to
queue uploads of files. Uploads are also queued at the end of the epoch. Defaults to every 100 batches."""),
default=100)

def initialize_object(self) -> RunDirectoryUploader:
from composer.callbacks.run_directory_uploader import RunDirectoryUploader
init_kwargs = {}
for key in ("key", "secret", "host", "port", "region"):
kwarg = getattr(self, key)
if getattr(self, key) is not None:
init_kwargs[key] = kwarg
init_kwargs.update(self.extra_init_kwargs)
return RunDirectoryUploader(
provider=self.provider,
container=self.container,
object_name_prefix=self.object_name_prefix,
num_concurrent_uploads=self.num_concurrent_uploads,
upload_staging_folder=self.upload_staging_folder,
use_procs=self.use_procs,
provider_init_kwargs=init_kwargs,
upload_every_n_batches=self.upload_every_n_batches,
)
Loading

0 comments on commit 71347a6

Please sign in to comment.