-
Notifications
You must be signed in to change notification settings - Fork 130
Distributed TensorFlow
Distributed TensorFlow is the official way how multiple machines can remotely communicate with each other, and how multiple devices (GPUs) can be used.
Related is multi GPU training in RETURNN via Horovod, but Horovod is an orthogonal implementation to what TF itself provides. Related are also the Tensorpack TF ZMQ ops, which also provide a way for remote communication (via ZeroMQ). TensorFlow itself uses gRPC (for remote communication) and NCCL (for Nvidia direct GPU transfers). (What operators are there for remote communication?)
This wiki page here is fully about the TF builtin features. This is less about RETURNN, but mostly an extension for the official TF documentation, or an overview.
- RETURNN: New dataset pipeline: draft (mostly about the dataset logic, but is related to distributed training, and also partly covers that)
- TensorFlow paper 2015: Large-Scale Machine Learning on Heterogeneous Distributed Systems (mostly covers already all the aspects here, not much has changed)
- TensorFlow API docs:
tf.distribute
- TensorFlow guide: Distributed training
- TensorFlow examples: deploy: distributed TensorFlow
- Google Cloud doc:
TF_CONFIG
for distributed training - Tensor2Tensor: distributed training
- TensorFlow ecosystem: distributed TensorFlow
tf.distribute.Server
MultiWorkerMirroredStrategy
- Multi-worker training with Keras
-
MultiDeviceIterator
(see code for insights) remote_call
- DeepMind: TF-Replicator: Distributed Machine Learning for Researchers (paper, resulting TF RFC)
- StackOverflow question: TensorFlow Master and Worker Service
- StackOverflow question: What has changed for distributed TensorFlow with TF 2?
There is a lot of new terminology around multi-GPU training, or distributed TensorFlow. It also can differ between framework (e.g. MPI or PyTorch have similar concepts but with partly different terminology).
Multi-GPU training (or also multi-TPU) can be on a single machine (computer, host), or across multiple, where each machine can have multiple GPUs (devices).
In the case of multiple machines, this would usually be managed via some cluster engine.
A cluster engine manages a collection of machines, each having some computing resources (CPUs, GPUs, memory), and specifically it manages the queueing of jobs (processes - instance of a program) on them. The jobs has certain restrictions (use N CPUs, M GPUs, some specific amount of memory, run up to T hours).
In our case, we mostly use Sun Grid Engine (SGE) (later renamed to Oracle Grid Engine). TensorFlow has official support for Google Compute Engine (GCE), Slurm, and some others but not SGE. But this is not so problematic.
In SGE, every computer is called a node. We adopt this terminology.
A node in our cluster has usually 4 GPUs. Multiple jobs can run on a single node, if not all resources are used by a single job.
(SGE specific: A job can also run across multiple nodes, via the -pe
option for qsub
, which will effectively run one process per node (in parallel). This is the parallel environments feature. See here for an example. You need this if you want to do multi-node-multi-GPU training.)
You must run at least one instance of your program per node. In practice, we will likely run one instance per GPU device per node, so each instance will have access to a single GPU. The instances will communicate somehow with each other (e.g. using Horovod or distributed TensorFlow; this is what this wiki page will cover; internally via NCCL or MPI or GRPC or so). There might be one master (or chief) instance, which takes care of everything (initializing the computation graph, loading data, saving checkpoints, etc), and all the other instances will just idle around and wait for commands by the master. Or the other instances might also do everything (or most things) on their own but then synchronize somehow with each other.
See also the glossary of tf.distribute
which partly covers this as well.
And the distributed TensorFlow deploy example, which also covers the terminology and an overview.
- Device: A (single, specific) GPU (or also TPU).
-
Task: Basically this is equivalent to one instance of the program, i.e. one process.
Or more specifically one instance of a TF server (i.e.
tf.distribute.Server
). This is in the context ofClusterSpec
and when running a TF server. This specifies one specific instance, i.e. one specific TF server, running on some host, which is reachable under"<host>:<port>"
. A task has a task type, sometimes also referred to as job, such as"worker"
or"chief"
. Within a job (e.g. the workers), there are a number of tasks, and each one has a specific index, determined by the list of all tasks in this job byClusterSpec
. -
Job: Synonym for task type (or also job name). A job comprises a list of tasks, which typically serve a common purpose. For example
"ps"
or"worker"
. Do not confuse this with the cluster job. A single cluster job (e.g. via SGE-pe
) would cover all the TF tasks which belongs together. - Worker and chief (master) (and maybe parameter server): These are task types. These are the roles of one instance of a TF server. worker is an instance of a task, or task type, or sometimes also job name, or host.
- Worker: This task will execute a single train step (on one copy of the model).
- Chief (master): This task is a worker with some extra responsibilities such as saving the checkpoint.
- Parameter server: These are machines that hold a single copy of parameters/variables. They don't have much to do, except for managing all reads/writes to the variables. This is usually used for async training, as all other workers would access the parameter server(s) in an async way. This is the default setup for async training in TF. But this is not really necessary in general for async training. In RETURNN, we would do async training, but sync directly the models. (Maybe it depends on the specific definition of async...)
-
Host: Can mean a machine, or a specific process. Mostly refers to the TF server, specified via the address it is reachable under (
"<host>:<port>"
). - (TensorFlow) Server: Instance of
tf.distribute.Server
(earliertf.train.Server
). This is basically one host, or one task. Part of a cluster. A TF session (client) can connect to a server and then execute commands (sub graphs). -
Client: Instance of
tf.compat.v1.Session
(earliertf.Session
) which connects to a server. A client also constructs the computation graph. -
Cluster: A collection of tasks, i.e. collection of machines (more specifically running TF servers, reachable under
"<host>:<port>"
, that participate in the distributed execution of a TensorFlow graph. This is described viaClusterSpec
(see below). -
Cluster resolver: E.g. for GCE, it will automatically figure out the environment, how much nodes there are, and derive the number of workers or machines, and their addresses, for the
ClusterSpec
(see below). -
Distributed strategy: An instance or subclass / implementation of
tf.distribute.Strategy
. This is the TensorFlow API to distribute training across multiple GPUs, multiple machines or TPUs. Using this API, you can distribute your existing models and training code with minimal code changes. - Replica: One copy of the model (parameters and computation graph), running on one slice of the input data. Right now each replica is executed on its own worker device. A replica may span multiple worker devices (for model parallelism).
-
In-graph replication: A single client builds a single
tf.Graph
that contains one set of parameters; and multiple copies of the compute-intensive part of the model, each pinned to a different task in"/job:worker"
(viatf.device
). -
Between-graph replication: There is a separate client for each worker task, typically in the same process as the worker task. Each client builds a similar graph containing the parameters and a single copy of the compute-intensive part of the model, pinned to the local task in
"/job:worker"
. - Asynchronous training: Each replica of the graph has an independent training loop that executes without coordination. It is compatible with both forms of replication above. This usually uses a parameter server but there are other options as well.
- Synchronous training: All of the replicas read the same values for the current parameters, compute gradients in parallel, and then apply them together. It is compatible with in-graph replication and between-graph replication.
Environment variable, used by TensorFlow (always? or in what cases?) to setup distributed sessions (?).
Example (via):
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
This is a dict, having these entries:
This is a tf.train.ClusterSpec
.
Dict containing:
-
worker
: list of strings ("<host>:<port>"
) of TF servers - All other task types (see below) are possible keys as well (
'ps'
, 'chief
', ...). -
local
: Not sure...
The cluster
definition would be shared for all nodes.
This is used by the TF server.
Dict containing:
-
type
:'worker'
,'ps'
(parameter server),'evaluator'
,'chief'
or'master'
. Usually it will be a worker. A chief is a worker which does a bit more, like saving the checkpoint (like rank 0 in MPI). -
index
: int. This task index corresponds to the rank in MPI terminology. Although it is counted per-type. I.e. there is both a chief with index 0 and a worker with index 0.
Represents one task, or host, part of a cluster.
Used for TF sessions to connect to (remotely, via gRPC). Example for a single local server (where the cluster only consists of this single local server) (via):
server = tf.train.Server.create_local_server()
sess = tf.Session(server.target)
(Difference to standard session.)
When starting the server, you can choose any port you want.
But all the ports (and hostnames) must be known in advance, to be able to specify the ClusterSpec
.
(Is it possible to add a worker on-the-fly? Why does a TF server needs to know about all other tasks/workers in the cluster?)
Usually a single tf.Session
(client) will connect to one server, although it is possible that multiple clients connect to a single server.
(on many operations/tensors, e.g. initializable_iterator
)
make_initializable_iterator
SO question,
my SO question.
Not sure...
(Used by MultiDeviceIterator
.)
(StackOverflow question: remote_call
vs tf.device
)
Higher-level API...
tf.train.replica_device_setter