Skip to content

Commit

Permalink
Don't init dist when world_size is 1 (#311)
Browse files Browse the repository at this point in the history
* cleanup dist init

* specify node rank

* rewrite _get_distributed_config_var for better error handling

* change a warning to an error

* woops

* dummy commit to trigger jenkins

* Update composer/utils/dist.py

Co-authored-by: ravi-mosaicml <ravi@mosaicml.com>

* woops

* remove print statement

Co-authored-by: ravi-mosaicml <ravi@mosaicml.com>
  • Loading branch information
2 people authored and A-Jacobson committed Feb 10, 2022
1 parent 85c82ca commit 8989b3c
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 39 deletions.
17 changes: 15 additions & 2 deletions composer/cli/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ def get_parser():
"Specifying a base_rank B and an nproc N will spawn processes "
"with global ranks [B, B+1, ... B+N-1]. Defaults to 0 (single-node "
"operation).")
parser.add_argument("--node_rank",
type=int,
default=-1,
help="The rank of this node. Set to -1 to assume that all nodes have "
"the same number of processes, and calculate accordingly. Defaults to -1.")
parser.add_argument("--master_addr",
type=str,
default="127.0.0.1",
Expand Down Expand Up @@ -88,11 +93,17 @@ def parse_args():
if args.world_size == -1:
args.world_size = args.nproc

if args.node_rank == -1:
if args.base_rank % args.nproc != 0:
raise ValueError("node_rank not specified, but unable to infer since nodes appear to "
"have different amounts of processes.")
args.node_rank = args.base_rank // args.nproc

return args


def launch_processes(nproc: int, world_size: int, base_rank: int, master_addr: str, master_port: Optional[int],
module_mode: bool, run_directory: Optional[str], training_script: str,
def launch_processes(nproc: int, world_size: int, base_rank: int, node_rank: int, master_addr: str,
master_port: Optional[int], module_mode: bool, run_directory: Optional[str], training_script: str,
training_script_args: List[Any]) -> Set[subprocess.Popen]:
log.info("Starting DDP on local node for global_rank(%s-%s)", base_rank, base_rank + nproc - 1)
processes = []
Expand Down Expand Up @@ -120,6 +131,7 @@ def launch_processes(nproc: int, world_size: int, base_rank: int, master_addr: s
current_env["WORLD_SIZE"] = str(world_size)
current_env["LOCAL_RANK"] = str(local_rank)
current_env["LOCAL_WORLD_SIZE"] = str(nproc)
current_env["NODE_RANK"] = str(node_rank)
current_env["MASTER_ADDR"] = master_addr
current_env["MASTER_PORT"] = str(master_port)
current_env["COMPOSER_RUN_DIRECTORY"] = run_directory
Expand Down Expand Up @@ -255,6 +267,7 @@ def main():
processes = launch_processes(nproc=args.nproc,
world_size=args.world_size,
base_rank=args.base_rank,
node_rank=args.node_rank,
master_addr=args.master_addr,
master_port=args.master_port,
module_mode=args.module_mode,
Expand Down
2 changes: 0 additions & 2 deletions composer/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ def _compile(
algorithms = sorted(algorithms_to_run,
key=lambda x: not isinstance(x, SelectiveBackprop) and not isinstance(x, StochasticDepth))

print(event, algorithms)

if event.is_after_event:
"""Establish a FILO queue of algorithms before_ and after_ an event.
Expand Down
66 changes: 31 additions & 35 deletions composer/utils/dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,30 @@ def _get_distributed_config_var(env_var: str,
default: int,
fetch_fn_name: Optional[str] = None) -> int:
if not dist.is_available():
warnings.warn(
f"DistributedDefaultValueWarning: Torch distributed is not available; returning {default} for {human_name}")
warnings.warn("DistributedDefaultValueWarning: Torch distributed is not available; "
f"returning {default} for {human_name}")
return default

if not env_var in os.environ:
warnings.warn(f"DistributedDefaultValueWarning: {env_var} env var not set"
f"{' and process group not initialized' if fetch_fn_name is not None else ''}; "
f"returning {default} for {human_name}.")
env_value = default
else:
env_value = int(os.environ[env_var])

if dist.is_initialized() and fetch_fn_name is not None:
assert env_value == int(getattr(dist, fetch_fn_name)()), "invariant violation"
dist_value = int(getattr(dist, fetch_fn_name)())
if env_var in os.environ:
env_value = int(os.environ[env_var])
if dist_value != env_value:
raise RuntimeError("Torch distributed has been initialized with a value of "
f"{dist_value} for {human_name}, but environment variable "
f"{env_var} has value {env_value}.")
return dist_value

if env_var in os.environ:
return int(os.environ[env_var])

if dist.is_initialized():
raise RuntimeError("Torch distributed is initialized but environment variable "
f"{env_var} is not set.")

return env_value
warnings.warn(f"DistributedDefaultValueWarning: {env_var} env var not set and Torch "
f"distributed not initialized; returning {default} for {human_name}.")
return default


def get_world_size() -> int:
Expand Down Expand Up @@ -73,24 +81,17 @@ def get_local_rank() -> int:
Returns:
int: The local world size
"""
local_rank = _get_distributed_config_var(env_var="LOCAL_RANK", human_name="local rank", default=0)
assert local_rank == get_global_rank() % get_local_world_size(), "invariant violation"
return local_rank
return _get_distributed_config_var(env_var="LOCAL_RANK", human_name="local rank", default=0)


def get_node_rank() -> int:
"""Returns the node rank. For example, if there are 2 nodes, and 2 ranks per node, then
global ranks 0-1 will have a node rank of 0, and global ranks 2-3 will have a node rank of 1.
.. note::
This function assumes an equal number of ranks (processes) per node, as determined by
:meth:`get_local_world_size`.
Returns:
int: The node rank, starting at 0.
"""
return get_global_rank() // get_local_world_size()
return _get_distributed_config_var(env_var="NODE_RANK", human_name="node rank", default=0)


def barrier() -> None:
Expand Down Expand Up @@ -235,23 +236,18 @@ def initialize_dist(backend: str, timeout: datetime.timedelta):
return

if dist.is_initialized():
if not dist.get_backend() == backend.lower():
warnings.warn(f"The requested backend ({backend}) differs from the backend "
f"of the current process group ({dist.get_backend()})."
"If you wish to change backends, please restart the python process.")
if dist.get_backend() != backend.lower():
raise RuntimeError(f"The requested backend ({backend}) differs from the backend "
f"of the current process group ({dist.get_backend()}). If you "
"wish to change backends, please restart the python process.")
return

if "RANK" in os.environ and "WORLD_SIZE" in os.environ:
# Assume we can initialize based off of env vars
if "RANK" not in os.environ or "WORLD_SIZE" not in os.environ:
warnings.warn("NoDistributedWarning: RANK and WORLD_SIZE env vars not set; assuming no "
"parallelization. If this is unexpected, make sure you are running your "
"training script with the composer CLI tool.")
elif get_world_size() > 1:
dist.init_process_group(backend, timeout=timeout)
return

warnings.warn("NoDistributedWarning: RANK and WORLD_SIZE env vars not set; assuming no parallelization. "
"If this is unexpected, make sure you are running your training script with the "
"composer executable.")
store = dist.HashStore()

dist.init_process_group(backend, timeout=timeout, store=store, world_size=1, rank=0)


def get_sampler(dataset, *, drop_last: bool, shuffle: bool) -> torch.utils.data.Sampler:
Expand Down

0 comments on commit 8989b3c

Please sign in to comment.