Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't init dist when world_size is 1 #311

Merged
merged 10 commits into from
Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions composer/cli/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ def get_parser():
"Specifying a base_rank B and an nproc N will spawn processes "
"with global ranks [B, B+1, ... B+N-1]. Defaults to 0 (single-node "
"operation).")
parser.add_argument("--node_rank",
type=int,
default=-1,
help="The rank of this node. Set to -1 to assume that all nodes have "
"the same number of processes, and calculate accordingly. Defaults to -1.")
parser.add_argument("--master_addr",
type=str,
default="127.0.0.1",
Expand Down Expand Up @@ -88,11 +93,17 @@ def parse_args():
if args.world_size == -1:
args.world_size = args.nproc

if args.node_rank == -1:
if args.base_rank % args.nproc != 0:
raise ValueError("node_rank not specified, but unable to infer since nodes appear to "
"have different amounts of processes.")
args.node_rank = args.base_rank // args.nproc

return args


def launch_processes(nproc: int, world_size: int, base_rank: int, master_addr: str, master_port: Optional[int],
module_mode: bool, run_directory: Optional[str], training_script: str,
def launch_processes(nproc: int, world_size: int, base_rank: int, node_rank: int, master_addr: str,
master_port: Optional[int], module_mode: bool, run_directory: Optional[str], training_script: str,
training_script_args: List[Any]) -> Set[subprocess.Popen]:
log.info("Starting DDP on local node for global_rank(%s-%s)", base_rank, base_rank + nproc - 1)
processes = []
Expand Down Expand Up @@ -120,6 +131,7 @@ def launch_processes(nproc: int, world_size: int, base_rank: int, master_addr: s
current_env["WORLD_SIZE"] = str(world_size)
current_env["LOCAL_RANK"] = str(local_rank)
current_env["LOCAL_WORLD_SIZE"] = str(nproc)
current_env["NODE_RANK"] = str(node_rank)
current_env["MASTER_ADDR"] = master_addr
current_env["MASTER_PORT"] = str(master_port)
current_env["COMPOSER_RUN_DIRECTORY"] = run_directory
Expand Down Expand Up @@ -255,6 +267,7 @@ def main():
processes = launch_processes(nproc=args.nproc,
world_size=args.world_size,
base_rank=args.base_rank,
node_rank=args.node_rank,
master_addr=args.master_addr,
master_port=args.master_port,
module_mode=args.module_mode,
Expand Down
2 changes: 0 additions & 2 deletions composer/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ def _compile(
algorithms = sorted(algorithms_to_run,
key=lambda x: not isinstance(x, SelectiveBackprop) and not isinstance(x, StochasticDepth))

print(event, algorithms)

if event.is_after_event:
"""Establish a FILO queue of algorithms before_ and after_ an event.

Expand Down
66 changes: 31 additions & 35 deletions composer/utils/dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,30 @@ def _get_distributed_config_var(env_var: str,
default: int,
fetch_fn_name: Optional[str] = None) -> int:
if not dist.is_available():
warnings.warn(
f"DistributedDefaultValueWarning: Torch distributed is not available; returning {default} for {human_name}")
warnings.warn("DistributedDefaultValueWarning: Torch distributed is not available; "
f"returning {default} for {human_name}")
return default

if not env_var in os.environ:
warnings.warn(f"DistributedDefaultValueWarning: {env_var} env var not set"
f"{' and process group not initialized' if fetch_fn_name is not None else ''}; "
f"returning {default} for {human_name}.")
env_value = default
else:
env_value = int(os.environ[env_var])

if dist.is_initialized() and fetch_fn_name is not None:
assert env_value == int(getattr(dist, fetch_fn_name)()), "invariant violation"
dist_value = int(getattr(dist, fetch_fn_name)())
if env_var in os.environ:
env_value = int(os.environ[env_var])
if dist_value != env_value:
raise RuntimeError("Torch distributed has been initialized with a value of "
f"{dist_value} for {human_name}, but environment variable "
f"{env_var} has value {env_value}.")
return dist_value

if env_var in os.environ:
return int(os.environ[env_var])

if dist.is_initialized():
raise RuntimeError("Torch distributed is initialized but environment variable "
f"{env_var} is not set.")

return env_value
warnings.warn(f"DistributedDefaultValueWarning: {env_var} env var not set and Torch "
f"distributed not initialized; returning {default} for {human_name}.")
return default


def get_world_size() -> int:
Expand Down Expand Up @@ -73,24 +81,17 @@ def get_local_rank() -> int:
Returns:
int: The local world size
"""
local_rank = _get_distributed_config_var(env_var="LOCAL_RANK", human_name="local rank", default=0)
assert local_rank == get_global_rank() % get_local_world_size(), "invariant violation"
return local_rank
return _get_distributed_config_var(env_var="LOCAL_RANK", human_name="local rank", default=0)


def get_node_rank() -> int:
"""Returns the node rank. For example, if there are 2 nodes, and 2 ranks per node, then
global ranks 0-1 will have a node rank of 0, and global ranks 2-3 will have a node rank of 1.

.. note::

This function assumes an equal number of ranks (processes) per node, as determined by
:meth:`get_local_world_size`.

Returns:
int: The node rank, starting at 0.
"""
return get_global_rank() // get_local_world_size()
return _get_distributed_config_var(env_var="NODE_RANK", human_name="node rank", default=0)


def barrier() -> None:
Expand Down Expand Up @@ -235,23 +236,18 @@ def initialize_dist(backend: str, timeout: datetime.timedelta):
return

if dist.is_initialized():
if not dist.get_backend() == backend.lower():
warnings.warn(f"The requested backend ({backend}) differs from the backend "
f"of the current process group ({dist.get_backend()})."
"If you wish to change backends, please restart the python process.")
if dist.get_backend() != backend.lower():
raise RuntimeError(f"The requested backend ({backend}) differs from the backend "
f"of the current process group ({dist.get_backend()}). If you "
"wish to change backends, please restart the python process.")
return

if "RANK" in os.environ and "WORLD_SIZE" in os.environ:
# Assume we can initialize based off of env vars
if "RANK" not in os.environ or "WORLD_SIZE" not in os.environ:
warnings.warn("NoDistributedWarning: RANK and WORLD_SIZE env vars not set; assuming no "
"parallelization. If this is unexpected, make sure you are running your "
"training script with the composer CLI tool.")
elif get_world_size() > 1:
dist.init_process_group(backend, timeout=timeout)
return

warnings.warn("NoDistributedWarning: RANK and WORLD_SIZE env vars not set; assuming no parallelization. "
"If this is unexpected, make sure you are running your training script with the "
"composer executable.")
store = dist.HashStore()

dist.init_process_group(backend, timeout=timeout, store=store, world_size=1, rank=0)


def get_sampler(dataset, *, drop_last: bool, shuffle: bool) -> torch.utils.data.Sampler:
Expand Down