Skip to content

Commit

Permalink
Sync OSS keras to tensorflow/tensorflow@6af4297 (where TF 2.5 RC is cut)
Browse files Browse the repository at this point in the history
Preparing for the Keras 2.5 RC cut.

PiperOrigin-RevId: 365252577
  • Loading branch information
qlzh727 authored and tensorflower-gardener committed Mar 26, 2021
1 parent d71247d commit 9c26610
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 39 deletions.
3 changes: 2 additions & 1 deletion keras/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -4066,7 +4066,8 @@ def func(model_inputs):
outs = model(model_inputs)
if wrap_outputs:
outs = [outs]
return tf_utils.to_numpy_or_python_type(outs)
return tf_utils.sync_to_numpy_or_python_type(outs)

return func

if kwargs:
Expand Down
48 changes: 35 additions & 13 deletions keras/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ def __init__(self,
cb._implements_predict_batch_hooks() for cb in self.callbacks)
# pylint: enable=protected-access

self._disallow_batch_hooks_in_ps_strategy()

# Performance check: Check batch hooks for slowness compared to batch time.
# Only run check for custom callbacks (i.e. not present in this file).
self._check_timing = any(
Expand Down Expand Up @@ -336,7 +338,7 @@ def _call_batch_hook_helper(self, hook_name, batch, logs):
hook(batch, logs)
else:
if numpy_logs is None: # Only convert once.
numpy_logs = tf_utils.to_numpy_or_python_type(logs)
numpy_logs = tf_utils.sync_to_numpy_or_python_type(logs)
hook(batch, numpy_logs)

if self._check_timing:
Expand Down Expand Up @@ -387,7 +389,7 @@ def on_epoch_begin(self, epoch, logs=None):
callback.on_epoch_begin(epoch, logs)
else:
if numpy_logs is None: # Only convert once.
numpy_logs = tf_utils.to_numpy_or_python_type(logs)
numpy_logs = tf_utils.sync_to_numpy_or_python_type(logs)
callback.on_epoch_begin(epoch, numpy_logs)

def on_epoch_end(self, epoch, logs=None):
Expand All @@ -408,7 +410,7 @@ def on_epoch_end(self, epoch, logs=None):
callback.on_epoch_end(epoch, logs)
else:
if numpy_logs is None: # Only convert once.
numpy_logs = tf_utils.to_numpy_or_python_type(logs)
numpy_logs = tf_utils.sync_to_numpy_or_python_type(logs)
callback.on_epoch_end(epoch, numpy_logs)

def on_train_batch_begin(self, batch, logs=None):
Expand Down Expand Up @@ -491,7 +493,7 @@ def on_train_begin(self, logs=None):
callback.on_train_begin(logs)
else:
if numpy_logs is None: # Only convert once.
numpy_logs = tf_utils.to_numpy_or_python_type(logs)
numpy_logs = tf_utils.sync_to_numpy_or_python_type(logs)
callback.on_train_begin(numpy_logs)

def on_train_end(self, logs=None):
Expand All @@ -508,7 +510,7 @@ def on_train_end(self, logs=None):
callback.on_train_end(logs)
else:
if numpy_logs is None: # Only convert once.
numpy_logs = tf_utils.to_numpy_or_python_type(logs)
numpy_logs = tf_utils.sync_to_numpy_or_python_type(logs)
callback.on_train_end(numpy_logs)

def on_test_begin(self, logs=None):
Expand All @@ -525,7 +527,7 @@ def on_test_begin(self, logs=None):
callback.on_test_begin(logs)
else:
if numpy_logs is None: # Only convert once.
numpy_logs = tf_utils.to_numpy_or_python_type(logs)
numpy_logs = tf_utils.sync_to_numpy_or_python_type(logs)
callback.on_test_begin(numpy_logs)

def on_test_end(self, logs=None):
Expand All @@ -542,7 +544,7 @@ def on_test_end(self, logs=None):
callback.on_test_end(logs)
else:
if numpy_logs is None: # Only convert once.
numpy_logs = tf_utils.to_numpy_or_python_type(logs)
numpy_logs = tf_utils.sync_to_numpy_or_python_type(logs)
callback.on_test_end(numpy_logs)

def on_predict_begin(self, logs=None):
Expand All @@ -559,7 +561,7 @@ def on_predict_begin(self, logs=None):
callback.on_predict_begin(logs)
else:
if numpy_logs is None: # Only convert once.
numpy_logs = tf_utils.to_numpy_or_python_type(logs)
numpy_logs = tf_utils.sync_to_numpy_or_python_type(logs)
callback.on_predict_begin(numpy_logs)

def on_predict_end(self, logs=None):
Expand All @@ -576,12 +578,32 @@ def on_predict_end(self, logs=None):
callback.on_predict_end(logs)
else:
if numpy_logs is None: # Only convert once.
numpy_logs = tf_utils.to_numpy_or_python_type(logs)
numpy_logs = tf_utils.sync_to_numpy_or_python_type(logs)
callback.on_predict_end(numpy_logs)

def __iter__(self):
return iter(self.callbacks)

def _disallow_batch_hooks_in_ps_strategy(self):
"""Error out if batch-level callbacks are passed with PSStrategy."""
# pylint: disable=protected-access
strategy = tf.distribute.get_strategy()
if strategy._should_use_with_coordinator:
unsupported_callbacks = []
for cb in self.callbacks:
# These Callbacks can accept RemoteValues directly.
if getattr(cb, '_supports_tf_logs', False):
continue
if (cb._implements_train_batch_hooks() or
cb._implements_test_batch_hooks() or
cb._implements_predict_batch_hooks()):
unsupported_callbacks.append(cb)
if unsupported_callbacks:
raise ValueError('Batch-level `Callback`s are not supported with '
'`ParameterServerStrategy`. Found unsupported '
'callbacks: {}'.format(unsupported_callbacks))
# pylint: enable=protected-access


@keras_export('keras.callbacks.Callback')
class Callback:
Expand Down Expand Up @@ -933,7 +955,7 @@ def on_batch_end(self, batch, logs=None):
logs = logs or {}
loss = logs.get('loss')
if loss is not None:
loss = tf_utils.to_numpy_or_python_type(loss)
loss = tf_utils.sync_to_numpy_or_python_type(loss)
if np.isnan(loss) or np.isinf(loss):
print('Batch %d: Invalid loss, terminating training' % (batch))
self.model.stop_training = True
Expand Down Expand Up @@ -1083,11 +1105,11 @@ def _batch_update_progbar(self, batch, logs=None):

if self.verbose == 1:
# Only block async when verbose = 1.
logs = tf_utils.to_numpy_or_python_type(logs)
logs = tf_utils.sync_to_numpy_or_python_type(logs)
self.progbar.update(self.seen, list(logs.items()), finalize=False)

def _finalize_progbar(self, logs, counter):
logs = tf_utils.to_numpy_or_python_type(logs or {})
logs = tf_utils.sync_to_numpy_or_python_type(logs or {})
if self.target is None:
if counter is not None:
counter = counter.numpy()
Expand Down Expand Up @@ -1382,7 +1404,7 @@ def _save_model(self, epoch, logs):
if isinstance(self.save_freq,
int) or self.epochs_since_last_save >= self.period:
# Block only when saving interval is reached.
logs = tf_utils.to_numpy_or_python_type(logs)
logs = tf_utils.sync_to_numpy_or_python_type(logs)
self.epochs_since_last_save = 0
filepath = self._get_file_path(epoch, logs)

Expand Down
62 changes: 57 additions & 5 deletions keras/distribute/dataset_creator_model_fit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,16 @@ def _model_fit(self,
x=None,
steps_per_epoch=10,
run_eagerly=False,
with_normalization_layer=False):
model, callbacks = self._model_compile(strategy, steps_per_execution,
run_eagerly,
with_normalization_layer)
with_normalization_layer=False,
callbacks=None):
if callbacks is None:
callbacks = []

model, default_callbacks = self._model_compile(strategy,
steps_per_execution,
run_eagerly,
with_normalization_layer)
callbacks += default_callbacks

def dataset_fn(input_context):
del input_context
Expand All @@ -118,7 +124,6 @@ def dataset_fn(input_context):
x,
epochs=10,
steps_per_epoch=steps_per_epoch,
verbose=0,
callbacks=callbacks,
validation_data=validation_data)
return model
Expand Down Expand Up @@ -200,6 +205,53 @@ def testClusterCoordinatorSingleInstance(self, strategy):
self.assertIs(strategy._cluster_coordinator,
tf.distribute.experimental.coordinator.ClusterCoordinator(strategy))

def testModelFitErrorOnBatchLevelCallbacks(self, strategy):

class BatchLevelCallback(callbacks_lib.Callback):

def on_train_batch_end(self, batch, logs=None):
pass

with self.assertRaisesRegex(ValueError,
"Batch-level `Callback`s are not supported"):
callbacks = [BatchLevelCallback()]
self._model_fit(strategy, callbacks=callbacks)

def testModelFitCallbackSupportsTFLogs(self, strategy):

class MyCallback(callbacks_lib.Callback):

def __init__(self):
super(MyCallback, self).__init__()
# Fetches the RemoteValues if necessary.
self._supports_tf_logs = True

def on_train_batch_end(self, batch, logs=None):
assert isinstance(logs, tf.distribute.experimental.coordinator.RemoteValue)

my_callback = MyCallback()
callbacks = [my_callback]
self._model_fit(strategy, callbacks=callbacks)

def testModelFitVerbosity(self, strategy):

class MyCallback(callbacks_lib.Callback):
pass

my_callback = MyCallback()
callbacks = [my_callback]
self._model_fit(strategy, callbacks=callbacks)
# PSStrategy should default to epoch-level logging.
self.assertEqual(my_callback.params["verbose"], 2)

def testModelFitTensorBoardEpochLevel(self, strategy):
log_dir = self.get_temp_dir()
callbacks = [callbacks_lib.TensorBoard(log_dir)]
self._model_fit(strategy, callbacks=callbacks)
self.assertTrue(tf.compat.v1.gfile.Exists(log_dir))
files = tf.compat.v1.gfile.ListDirectory(log_dir)
self.assertGreaterEqual(len(files), 1)


if __name__ == "__main__":
tf.compat.v1.enable_v2_behavior()
Expand Down
6 changes: 0 additions & 6 deletions keras/engine/data_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1313,9 +1313,6 @@ def _validate_data_handler(self):
"`steps_per_execution > 1`, you must specify the number of steps "
"to run.")

def resolve_logs(self, logs):
return logs


class _ClusterCoordinatorDataHandler(DataHandler):
"""A `DataHandler` that is compatible with `ClusterCoordinator`."""
Expand Down Expand Up @@ -1344,9 +1341,6 @@ def per_worker_dataset_fn():
def sync(self):
self._model._cluster_coordinator.join() # pylint: disable=protected-access

def resolve_logs(self, logs):
return logs.fetch()


def get_data_handler(*args, **kwargs):
if getattr(kwargs["model"], "_cluster_coordinator", None):
Expand Down
34 changes: 23 additions & 11 deletions keras/engine/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ def fit(self,
y=None,
batch_size=None,
epochs=1,
verbose=1,
verbose='auto',
callbacks=None,
validation_split=0.,
validation_data=None,
Expand Down Expand Up @@ -915,18 +915,24 @@ def fit(self,
The model is not trained for a number of iterations
given by `epochs`, but merely until the epoch
of index `epochs` is reached.
verbose: 0, 1, or 2. Verbosity mode.
verbose: 'auto', 0, 1, or 2. Verbosity mode.
0 = silent, 1 = progress bar, 2 = one line per epoch.
Note that the progress bar is not particularly useful when
logged to a file, so verbose=2 is recommended when not running
interactively (eg, in a production environment).
'auto' defaults to 1 for most cases, but 2 when used with
`ParameterServerStrategy`. Note that the progress bar is not
particularly useful when logged to a file, so verbose=2 is
recommended when not running interactively (eg, in a production
environment).
callbacks: List of `keras.callbacks.Callback` instances.
List of callbacks to apply during training.
See `tf.keras.callbacks`. Note `tf.keras.callbacks.ProgbarLogger`
and `tf.keras.callbacks.History` callbacks are created automatically
and need not be passed into `model.fit`.
`tf.keras.callbacks.ProgbarLogger` is created or not based on
`verbose` argument to `model.fit`.
Callbacks with batch-level calls are currently unsupported with
`tf.distribute.experimental.ParameterServerStrategy`, and users are
advised to implement epoch-level calls instead with an appropriate
`steps_per_epoch` value.
validation_split: Float between 0 and 1.
Fraction of the training data to be used as validation data.
The model will set apart this fraction of the training data,
Expand Down Expand Up @@ -1075,6 +1081,12 @@ def fit(self,
self._check_call_args('fit')
_disallow_inside_tf_function('fit')

if verbose == 'auto':
if self.distribute_strategy._should_use_with_coordinator: # pylint: disable=protected-access
verbose = 2 # Default to epoch-level logging for PSStrategy.
else:
verbose = 1 # Default to batch-level logging otherwise.

if validation_split:
# Create the validation data using the training data. Only supported for
# `Tensor` and `NumPy` input.
Expand Down Expand Up @@ -1152,7 +1164,7 @@ def fit(self,
if self.stop_training:
break

logs = data_handler.resolve_logs(logs)
logs = tf_utils.sync_to_numpy_or_python_type(logs)
if logs is None:
raise ValueError('Expect x to be a non-empty array or dataset.')
epoch_logs = copy.copy(logs)
Expand Down Expand Up @@ -1455,7 +1467,7 @@ def evaluate(self,
logs = tmp_logs # No error, now safe to assign to logs.
end_step = step + data_handler.step_increment
callbacks.on_test_batch_end(end_step, logs)
logs = tf_utils.to_numpy_or_python_type(logs)
logs = tf_utils.sync_to_numpy_or_python_type(logs)
callbacks.on_test_end(logs=logs)

if return_dict:
Expand Down Expand Up @@ -1705,7 +1717,7 @@ def predict(self,
raise ValueError('Expect x to be a non-empty array or dataset.')
callbacks.on_predict_end()
all_outputs = tf.__internal__.nest.map_structure_up_to(batch_outputs, concat, outputs)
return tf_utils.to_numpy_or_python_type(all_outputs)
return tf_utils.sync_to_numpy_or_python_type(all_outputs)

def reset_metrics(self):
"""Resets the state of all the metrics in the model.
Expand Down Expand Up @@ -1789,7 +1801,7 @@ class during training. This can be useful to tell the model to "pay

if reset_metrics:
self.reset_metrics()
logs = tf_utils.to_numpy_or_python_type(logs)
logs = tf_utils.sync_to_numpy_or_python_type(logs)
if return_dict:
return logs
else:
Expand Down Expand Up @@ -1847,7 +1859,7 @@ def test_on_batch(self,

if reset_metrics:
self.reset_metrics()
logs = tf_utils.to_numpy_or_python_type(logs)
logs = tf_utils.sync_to_numpy_or_python_type(logs)
if return_dict:
return logs
else:
Expand Down Expand Up @@ -1877,7 +1889,7 @@ def predict_on_batch(self, x):
iterator = data_adapter.single_batch_iterator(self.distribute_strategy, x)
self.predict_function = self.make_predict_function()
outputs = self.predict_function(iterator)
return tf_utils.to_numpy_or_python_type(outputs)
return tf_utils.sync_to_numpy_or_python_type(outputs)

def fit_generator(self,
generator,
Expand Down
2 changes: 1 addition & 1 deletion keras/optimizer_v2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,4 @@ def strategy_supports_no_merge_call():
if not tf.distribute.has_strategy():
return True
strategy = tf.distribute.get_strategy()
return not getattr(strategy.extended, "_use_merge_call", True)
return not strategy.extended._use_merge_call() # pylint: disable=protected-access
Loading

0 comments on commit 9c26610

Please sign in to comment.