Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Modificar un script de entrenamiento de TensorFlow
En esta sección, aprenderá a modificar los scripts de entrenamiento de TensorFlow para configurar la biblioteca de paralelismo de modelos de SageMaker para la partición automática y la partición manual. Esta selección de ejemplos incluye también un ejemplo integrado con Horovod para modelos híbridos y paralelismo de datos.
nota
Para saber qué versiones de TensorFlow son compatibles con la biblioteca, consulte Marcos admitidos y Regiones de AWS.
Las modificaciones necesarias que debe realizar en el script de entrenamiento para utilizar la biblioteca se enumeran en División automatizada con TensorFlow.
Para obtener información sobre cómo modificar el script de entrenamiento para utilizar el modelo híbrido y el paralelismo de datos con Horovod, consulte División automatizada con TensorFlow y Horovod para un modelo híbrido y paralelismo de datos.
Si quiere usar particiones manuales, revise también División manual con TensorFlow.
Los siguientes temas muestran ejemplos de scripts de entrenamiento que puede usar para configurar la biblioteca de paralelismo de modelos de SageMaker para la partición automática y manual de modelos de TensorFlow.
nota
La partición automática está habilitada de forma predeterminada. A menos que se especifique lo contrario, los scripts de ejemplo utilizan la partición automática.
Temas
División automatizada con TensorFlow
Se requieren los siguientes cambios en el script de entrenamiento para ejecutar un modelo de TensorFlow con la biblioteca de paralelismo de modelos de SageMaker:
-
Importe e inicialice la biblioteca con
smp.init(). -
Definir el modelo de Keras que se hereda de
smp.DistributedModelen lugar de la clase de modelo Keras. Devolver las salidas del modelo desde el método de llamada del objeto smp.DistributedModel. Tenga en cuenta que los tensores devueltos del método de llamada se transmitirán a través de dispositivos de paralelismo de modelos, lo que supondrá una sobrecarga de comunicación, por lo que no se debe devolver los tensores que no sean necesarios fuera del método de llamada (como las activaciones intermedias). -
Establezca
drop_remainder=Trueen el métodotf.Dataset.batch(). Esto sirve para garantizar que el tamaño del lote sea siempre divisible por el número de microlotes. -
Comience las operaciones aleatorias en la canalización de datos usando
smp.dp_rank(), por ejemplo,shuffle(ds, seed=smp.dp_rank()), para garantizar la coherencia de las muestras de datos en las GPU que contienen particiones de modelos diferentes. -
Coloque la lógica hacia adelante y hacia atrás en una función de paso y decórela con
smp.step. -
Realice un procesamiento posterior en las salidas de los microlotes mediante los métodos
StepOutputtales como reduce_mean. La funciónsmp.stepdebe tener un valor devuelto que depende de la salida de smp.DistributedModel. -
Si hay algún paso de evaluación, coloque de manera similar la lógica de avance dentro de una función decorada
smp.stepy procese posteriormente las salidas utilizando la APIStepOutput.
Para obtener más información sobre la API de biblioteca de paralelismo de modelos de SageMaker, consulte la Documentación API
El siguiente script de Python es un ejemplo de un script de entrenamiento después de realizar los cambios.
import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
Si ha terminado de preparar el script de entrenamiento, continúe con Paso 2: lance un trabajo de entrenamiento con el SageMaker Python SDK. Si desea ejecutar un trabajo de entrenamiento paralelo de datos y modelo híbrido, proceda con la siguiente sección.
División automatizada con TensorFlow y Horovod para un modelo híbrido y paralelismo de datos
Puede utilizar la biblioteca de paralelismo de modelos de SageMaker con Horovod para el paralelismo de datos y modelos híbridos. Para obtener más información sobre cómo la biblioteca divide un modelo para el paralelismo híbrido, consulte Paralelismo de canalización (disponible para PyTorch y TensorFlow).
En este paso, nos centraremos en cómo modificar el script de entrenamiento para adaptar la biblioteca de paralelismo de modelos de SageMaker.
Para configurar correctamente su script de entrenamiento para que recoja la configuración de paralelismo híbrido que utilizará en Paso 2: lance un trabajo de entrenamiento con el SageMaker Python SDK, utilice las funciones auxiliares de la biblioteca, smp.dp_rank() y smp.mp_rank(), que detectan automáticamente el rango de paralelismo de datos y el rango de paralelismo de modelos, respectivamente.
Para encontrar todas las primitivas de MPI que admite la biblioteca, consulte Conceptos básicos de MPI
Los cambios principales necesarios en el script son:
-
Añadir
hvd.allreduce -
Variables de radiodifusión después del primer lote, según lo requerido por Horovod
-
Propagación de operaciones de partición y/o fragmentación en la canalización de datos con
smp.dp_rank().
nota
Cuando utilice Horovod, no debe llamar directamente hvd.init en su script de entrenamiento. En su lugar, tendrá que configurar "horovod" en True en los parámetros modelparallel de Python SDK en Paso 2: lance un trabajo de entrenamiento con el SageMaker Python SDK. Esto permite que la biblioteca inicialice Horovod internamente en función de las asignaciones de dispositivos de las particiones del modelo. Llamar hvd.init() directamente a su script de entrenamiento puede provocar problemas.
nota
El uso de la API hvd.DistributedOptimizer directamente en el script de entrenamiento puede provocar un rendimiento y una velocidad de entrenamiento deficientes, ya que la API incluye implícitamente la operación AllReduce en smp.step. Le recomendamos utilizar la biblioteca de paralelismo de modelos con Horovod llamando hvd.allreduce directamente después de llamar a accumulate() o reduce_mean() o en los gradientes devueltos por smp.step, como se mostrará en el siguiente ejemplo.
Para obtener más información sobre la API de biblioteca de paralelismo de modelos de SageMaker, consulte la Documentación API
import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))
División manual con TensorFlow
Uso de gestores de contexto de smp.partition para colocar las operaciones en una partición específica. Toda operación no colocada en ningún contexto smp.partition se colocará en default_partition. Para obtener más información sobre la API de biblioteca de paralelismo de modelos de SageMaker, consulte la Documentación API
import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
Características del marco no compatibles
La biblioteca no admite las siguientes funciones de TensorFlow:
-
Actualmente
tf.GradientTape()no es compatible. Puede usarOptimizer.get_gradients()oOptimizer.compute_gradients()en lugar de gradiente informáticos. -
Actualmente, la API
tf.train.Checkpoint.restore()no es compatible. Para los puntos de control, utilicesmp.CheckpointManageren su lugar, que proporciona la misma API y funcionalidad. Tenga en cuenta que las restauraciones del punto de control consmp.CheckpointManagerdeben realizarse después del primer paso.