Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support different tf.distribute.Strategies for distributed training on SageMaker #391

Closed
anirudhacharya opened this issue Jun 23, 2020 · 14 comments · Fixed by aws/sagemaker-python-sdk#3192

Comments

@anirudhacharya
Copy link

For strategies like Multi Worker Mirrored-Strategy TF2 requires us to configure each node individually (https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras#multi-worker_configuration). Currently SageMaker does not provide us a way of doing this while trying to launch a distributed training job with Multi Worker Mirrored-Strategy using estimator.fit() method

@laurenyu
Copy link
Contributor

@laurenyu If I have to set up things myself, then how can I use esitmator.fit() API to launch the training job? Can't the TF_CONFIG be configured with the distributions parameter (like here)

if you don't specify anything for distributions when creating your estimator, then no TF_CONFIG is set. You can then write your own config in your training script. estimatr.fit() shouldn't interfere, since all it does is launch the training job, which then runs your script.

@anirudhacharya
Copy link
Author

@laurenyu the config is different for different nodes in the cluster. Are you suggesting we do something like this -

# Start of train.py

if getHostname() == "algo-1":
  # write TF_CONFIG for node-1
elif getHostname() == "algo-2":
  # write TF_CONFIG for node-2

import tensorflow
# Start the actual training script

@laurenyu
Copy link
Contributor

@anirudhacharya yep, that's exactly what I was thinking. You can also use the environment variable SM_CURRENT_HOST to get the host name (docs).

@anirudhacharya
Copy link
Author

@laurenyu I can try this, but I am not sure it will work, because TF_CONFIG looks something like this -

os.environ['TF_CONFIG'] = json.dumps({
                            'cluster': {
                                'worker': [<list of addresses & ports of the nodes that make up the cluster>]
                                },
                            'task': {'type': 'worker', 'index': 0}
                        })

while with the conventional cluster setup, I can ssh into each node and get information like ip address and port number; I am not sure how I would be able to do that from within the training script.

@nadiaya
Copy link
Contributor

nadiaya commented Jul 10, 2020

Did you give it a try?
Hostname + port should work.

@maddy392
Copy link

maddy392 commented Jul 8, 2021

@nadiaya

while using parameter server distribution type in estimator.fit() call, i came across sagemaker's TF_CONFIG in the logs such as following (just an example for Parameter Server distribution strategy)

TF_CONFIG={
        "cluster":
            {"master": ["algo-1:2222"],
             "ps": ["algo-1:2223", "algo-2:2223"],
             "worker": ["algo-2:2222"]},
        "environment": "cloud",
        "task": {"index": 0, "type": "master"}}

Question:

  1. In our training scripts, how do we access port numbers of workers in each algo (host/instance)? I know we can use os.environ['SM_HOSTS'] to find the hosts, but what about port numbers for each algo (host/instance). like the 2222 and 2223 in the example above ?
  2. If we are able to get port numbers, then its just a matter of defining the right TF_CONFIG to implement Multi Worker Mirrored-Strategy, right ?
  3. A working example for Multi Worker Mirrored-Strategy using 2 or more multiple-gpu instances (ml.p3.8xlarge or bigger) would be highly appreciated.

Thank you

@vdabravolski
Copy link

Sharing an implementation of working TF Config for MultiNodeMirroredStrategy below. This has been tested on SageMaker Deep Learning container with TensorFlow v2.8 (link to dockerfile).

def _build_tf_config():

    hosts = json.loads(os.getenv("SM_HOSTS"))
    current_host = os.getenv("SM_CURRENT_HOST")

    workers = hosts

    def host_addresses(hosts, port=7777):
        return ["{}:{}".format(host, port) for host in hosts]

    tf_config = {"cluster": {}, "task": {}}
    tf_config["cluster"]["worker"] = host_addresses(workers)
    tf_config["task"] = {"index": workers.index(current_host), "type": "worker"}

    os.environ["TF_CONFIG"] = json.dumps(tf_config)

    return

@Lokiiiiii
Copy link
Contributor

Lokiiiiii commented May 17, 2022

Is there any update on this ? Is anybody working on a PR ?

I see a TF_CONFIG setup is already implemented in https://github.com/aws/sagemaker-tensorflow-training-toolkit/blob/master/src/sagemaker_tensorflow_container/training.py#L37. It would only need some minor modifications of MWMS. The only task remaining is to add a new distribution option named 'multi_worker_mirrored' and include it in this condition https://github.com/aws/sagemaker-tensorflow-training-toolkit/blob/master/src/sagemaker_tensorflow_container/training.py#L139

I will be happy to cut a PR for this if required. This has been open for way too long.

@TZeng20
Copy link

TZeng20 commented Jun 23, 2022

Hey @Lokiiiiii so is the aim of your feature to add a distribution argument for multi worker mirrored strategy in Sagemaker? i.e.
estimator = sagemaker.tensorflow.TensorFlow(entry_point='script.py', ..........distribution = {})
What would the distribution argument be? I know the distribution arg is needed for something like sagemaker distributed data parallel. Right now I use the function provided by vdabravolski to setup the cluster config.

@Lokiiiiii
Copy link
Contributor

Hey @Lokiiiiii so is the aim of your feature to add a distribution argument for multi worker mirrored strategy in Sagemaker? i.e. estimator = sagemaker.tensorflow.TensorFlow(entry_point='script.py', ..........distribution = {}) What would the distribution argument be? I know the distribution arg is needed for something like sagemaker distributed data parallel. Right now I use the function provided by vdabravolski to setup the cluster config.

Yes, I have suggested distribution = {'multi_worker_mirrored_strategy': {'enabled': True }} in this PR.

@TZeng20
Copy link

TZeng20 commented Jun 24, 2022

Does anything else apart from the TF_CONFIG need to be configured to make multiworkermirrored work?
Let's say I use 2 instances of ml.g4dn.8xlarge which has only 1 gpu per machine.
estimator = sagemaker.tensorflow.TensorFlow(entry_point='script.py', num_instances = 2,.........)

If I use the same config from @vdabravolski it identifies 2 workers properly, len(tf_config['cluster']['worker']) = 2
But it seems that the tf.distribute.InputContext is unable to properly identify the number of replicas in sync or number of input pipelines. Shouldn't num_input_pipelines and num_replicas_in_sync be 2?

strategy= tf.distribute.MultiWorkerMirroredStrategy()

def dataset_fn(input_context):
  dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(64).batch(16)
  print(input_context.num_input_pipelines, input_context.input_pipeline_id, input_context.num_replicas_in_sync)
  return dataset

dist_dataset = strategy.distribute_datasets_from_function(dataset_fn)

>>>
1 0 1

@Lokiiiiii
Copy link
Contributor

Lokiiiiii commented Jul 4, 2022

tf.distribute.InputContext is unable to properly identify the number of replicas in sync or number of input pipelines

This seems like a discussions for https://github.com/tensorflow/tensorflow/issues?q=is%3Aissue+multiworkermirroredstrategy+

@surajitkundu-dazn
Copy link

Sharing an implementation of working TF Config for MultiNodeMirroredStrategy below. This has been tested on SageMaker Deep Learning container with TensorFlow v2.8 (link to dockerfile).

def _build_tf_config():

    hosts = json.loads(os.getenv("SM_HOSTS"))
    current_host = os.getenv("SM_CURRENT_HOST")

    workers = hosts

    def host_addresses(hosts, port=7777):
        return ["{}:{}".format(host, port) for host in hosts]

    tf_config = {"cluster": {}, "task": {}}
    tf_config["cluster"]["worker"] = host_addresses(workers)
    tf_config["task"] = {"index": workers.index(current_host), "type": "worker"}

    os.environ["TF_CONFIG"] = json.dumps(tf_config)

    return

Where would you add this function ? there is also similar function present here https://docs.aws.amazon.com/sagemaker/latest/dg/training-compiler-tensorflow-models.html, but when I use this function in starting of my training script then I see runtime error (RuntimeError: Collective ops must be configured at program startup),If I do it post strategy then it doesn’t work as multi node :(

@Lokiiiiii
Copy link
Contributor

The cluster setup required for MultiWorkerMirroredStrategy is now available as part of the SageMaker Training ToolKit from TF >=2.9. Since we are still working on the SageMaker SDK UI for the same, in the meantime you can leverage the cluster setup by setting an environment variable sagemaker_multi_worker_mirrored_strategy_enabled to 'true'.

As indicated by the error message, the cluster setup or in this case the environment variable needs to the invoked before the training script is executed. You can specify the environment keyword argument when creating the estimator if using the SageMaker SDK.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
9 participants