Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Modifica di uno script di addestramento TensorFlow
In questa sezione, imparerai come modificare gli script di addestramento TensorFlow per configurare la libreria di parallelismo dei modelli SageMaker per il partizionamento automatico e il partizionamento manuale. Gli esempi scelti comprendono anche un esempio integrato con Horovod per il modello ibrido e il parallelismo dei dati.
Nota
Per scoprire quali versioni di TensorFlow sono supportate dalla libreria, consulta Framework e Regioni AWS supportati.
Le modifiche necessarie da apportare allo script di addestramento per utilizzare la libreria sono elencate in Suddivisione automatica con TensorFlow.
Per maggiori informazioni su come modificare lo script di addestramento per utilizzare il modello ibrido e il parallelismo dei dati con Horovod, consulta Suddivisione automatizzata con TensorFlow e Horovod per il parallelismo ibrido di dati e modelli.
Se desideri utilizzare il partizionamento manuale, consulta anche Suddivisione manuale con TensorFlow.
Gli argomenti seguenti mostrano esempi di script di addestramento che è possibile utilizzare per configurare la libreria di parallelismo dei modelli SageMaker per il partizionamento automatico e il partizionamento manuale dei modelli TensorFlow.
Nota
Il partizionamento automatico è abilitato come impostazione predefinita. Se non diversamente specificato, gli script di esempio utilizzano il partizionamento automatico.
Argomenti
Suddivisione automatica con TensorFlow
Le seguenti modifiche allo script di addestramento sono necessarie per eseguire un modello TensorFlow con la libreria di parallelismo dei modelli di SageMaker:
-
Importa e inizializza la libreria con
smp.init(). -
Definisci un modello Keras ereditandolo da
smp.DistributedModelanziché dalla classe di modelli Keras. Restituisci gli output del modello dal metodo di chiamata dell'oggetto smp.DistributedModel. Tieni presente che tutti i tensori restituiti dal metodo di chiamata verranno trasmessi su dispositivi paralleli al modello, con un sovraccarico di comunicazione, quindi non dovrebbe essere restituito alcun tensore che non è necessario al di fuori del metodo di chiamata (come le attivazioni intermedie) . -
Imposta
drop_remainder=Truenel metodotf.Dataset.batch(). Ciò serve a garantire che la dimensione del batch sia sempre divisibile per il numero di microbatch. -
Fornisci i seed alle operazioni casuali nella pipeline di dati utilizzando
smp.dp_rank(), ad esempio,shuffle(ds, seed=smp.dp_rank())per garantire la coerenza dei campioni di dati tra le GPU che contengono partizioni di modelli diversi. -
Inserisci la logica forward e backward in una funzione a fasi e decorala con
smp.step. -
Esegui la post-elaborazione sugli output tra i microbatch utilizzando metodi
StepOutputcome reduce_mean. La funzionesmp.stepdeve restituire un valore che dipende dall'output di smp.DistributedModel. -
Se è presente una fase di valutazione, inserisci in modo analogo la logica forward all'interno di una funzione decorata
smp.stepe post-elabora gli output utilizzando l'APIStepOutput.
Per saperne di più sull'API della libreria di parallelismo dei modelli di SageMaker, consulta la documentazione API
Il seguente script Python è un esempio di script di addestramento dopo l'introduzione delle modifiche.
import tensorflow as tf # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return the model output model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
Se hai finito di preparare lo script di addestramento, procedi con Fase 2: avvio di un processo di addestramento utilizzando SageMaker SDK Python. Se desideri eseguire un modello ibrido e un processo di addestramento per il parallelismo dei dati, continua con la sezione successiva.
Suddivisione automatizzata con TensorFlow e Horovod per il parallelismo ibrido di dati e modelli
È possibile utilizzare la libreria di parallelismo dei modelli SageMaker con Horovod per il parallelismo ibrido di modelli e dati. Per saperne di più su come la libreria suddivide un modello per il parallelismo ibrido, consulta Parallelizzazione della pipeline (disponibile per Pytorch e TensorFlow).
In questa fase, ci concentriamo sul modo per modificare il proprio script di addestramento per adattare la libreria di parallelismo dei modelli SageMaker.
Per configurare correttamente il proprio script di addestramento per acquisire la configurazione di parallelismo ibrido che sarà impostata in Fase 2: avvio di un processo di addestramento utilizzando SageMaker SDK Python, usa le funzioni di supporto della libreria, smp.dp_rank() e smp.mp_rank(), che rilevano automaticamente rispettivamente la classificazione parallela dei dati e la classificazione parallela dei modelli.
Per trovare tutte le primitive MPI supportate dalla libreria, consulta MPI Basics
Le modifiche richieste nello script sono:
-
Aggiunta di
hvd.allreduce -
Variabili di trasmissione dopo il primo batch, come richiesto da Horovod
-
Operazioni di partizionamento e/o shuffling dei seed nella pipeline di dati con
smp.dp_rank().
Nota
Quando usi Horovod, non devi chiamare hvd.init direttamente nel tuo script di addestramento. Dovrai invece impostare "horovod" su True nei parametri modelparallel dell'SDK Python SageMaker in Fase 2: avvio di un processo di addestramento utilizzando SageMaker SDK Python. Ciò consente alla libreria di inizializzare internamente Horovod in base alle assegnazioni dei dispositivi delle partizioni del modello. La chiamata di hvd.init() direttamente nello script di addestramento può causare problemi.
Nota
L'utilizzo dell'API hvd.DistributedOptimizer direttamente nello script di addestramento potrebbe comportare velocità e prestazioni di addestramento scadenti, poiché l'API inserisce implicitamente l'operazione AllReduce all'interno di smp.step. Ti consigliamo di utilizzare la libreria di parallelismo dei modelli con Horovod chiamando direttamente hvd.allreduce dopo aver chiamato accumulate() o reduce_mean() sui gradienti restituiti da smp.step, come verrà mostrato nell'esempio seguente.
Per saperne di più sull'API della libreria di parallelismo dei modelli di SageMaker, consulta la documentazione API
import tensorflow as tf import horovod.tensorflow as hvd # smdistributed: Import TF2.x API import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: Seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API class MyModel(smp.DistributedModel): def __init__(self): super(MyModel, self).__init__() # define layers def call(self, x, training=None): # define forward pass and return model outputs model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels, first_batch): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches # Horovod: AllReduce the accumulated gradients gradients = [hvd.allreduce(g.accumulate()) for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # Horovod: Broadcast the variables after first batch if first_batch: hvd.broadcast_variables(model.variables, root_rank=0) hvd.broadcast_variables(optimizer.variables(), root_rank=0) # smdistributed: Merge predictions across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for batch, (images, labels) in enumerate(train_ds): loss = train_step(images, labels, tf.constant(batch == 0))
Suddivisione manuale con TensorFlow
Usa i gestori di contesto smp.partition per inserire le operazioni in una partizione specifica. Qualsiasi operazione non inserita in alcun contesto smp.partition viene inserita in default_partition. Per saperne di più sull'API della libreria di parallelismo dei modelli di SageMaker, consulta la documentazione API
import tensorflow as tf # smdistributed: Import TF2.x API. import smdistributed.modelparallel.tensorflow as smp # smdistributed: Initialize smp.init() # Download and load MNIST dataset. (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data( "MNIST-data-%d" % smp.rank() ) x_train, x_test = x_train / 255.0, x_test / 255.0 # Add a channels dimension x_train = x_train[..., tf.newaxis] x_test = x_test[..., tf.newaxis] # smdistributed: If needed, seed the shuffle with smp.dp_rank(), and drop_remainder # in batching to make sure batch size is always divisible by number of microbatches. train_ds = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000, seed=smp.dp_rank()) .batch(256, drop_remainder=True) ) # smdistributed: Define smp.DistributedModel the same way as Keras sub-classing API. class MyModel(smp.DistributedModel): def __init__(self): # define layers def call(self, x): with smp.partition(0): x = self.layer0(x) with smp.partition(1): return self.layer1(x) model = MyModel() loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) optimizer = tf.keras.optimizers.Adam() train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name="train_accuracy") # smdistributed: Define smp.step. Return any tensors needed outside @smp.step def get_grads(images, labels): predictions = model(images, training=True) loss = loss_object(labels, predictions) grads = optimizer.get_gradients(loss, model.trainable_variables) return grads, loss, predictions @tf.function def train_step(images, labels): gradients, loss, predictions = get_grads(images, labels) # smdistributed: Accumulate the gradients across microbatches gradients = [g.accumulate() for g in gradients] optimizer.apply_gradients(zip(gradients, model.trainable_variables)) # smdistributed: Merge predictions and average losses across microbatches train_accuracy(labels, predictions.merge()) return loss.reduce_mean() for epoch in range(5): # Reset the metrics at the start of the next epoch train_accuracy.reset_states() for images, labels in train_ds: loss = train_step(images, labels) accuracy = train_accuracy.result()
Funzionalità del framework non supportate
Le seguenti funzionalità di TensorFlow non sono supportate dalla libreria:
-
tf.GradientTape()non è attualmente supportato. Puoi invece usareOptimizer.get_gradients()oOptimizer.compute_gradients()per calcolare i gradienti. -
L'API
tf.train.Checkpoint.restore()non è attualmente supportata. Per il checkpoint, usa invecesmp.CheckpointManager, che fornisce la stessa API e funzionalità. Tieni presente che il ripristino dei checkpoint consmp.CheckpointManagerdovrebbe avvenire dopo la prima fase.