Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Exécution d'une tâche d'entraînement distribué avec parallélisme de modèles SageMaker
Dans cette section, vous allez apprendre :
-
à configurer un estimateur PyTorch SageMaker et l'option de parallélisme de modèles SageMaker pour utiliser le parallélisme de tenseur ;
-
à adapter le script d'entraînement à l'aide des modules
smdistributed.modelparallelétendus de parallélisme de tenseur.
Pour en savoir plus sur les modules smdistributed.modelparallel, consultez SageMaker model parallel APIs
Parallélisme de tenseur seul
Voici un exemple d'option d'entraînement distribué permettant d'activer uniquement le parallélisme de tenseur, sans parallélisme de pipeline. Configurez les dictionnaires mpi_options et smp_options pour spécifier des options d'entraînement distribué pour l'estimateur PyTorch SageMaker.
Note
Des fonctions d'économie de mémoire étendues sont disponibles via Deep Learning Containers pour PyTorch, qui implémente la bibliothèque de parallélisme de modèles SageMaker version 1.6.0 ou ultérieure.
Configuration d'un estimateur PyTorch SageMaker
mpi_options = { "enabled" : True, "processes_per_host" : 8, # 8 processes "custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none " } smp_options = { "enabled":True, "parameters": { "pipeline_parallel_degree": 1, # alias for "partitions" "placement_strategy": "cluster", "tensor_parallel_degree": 4, # tp over 4 devices "ddp": True } } smp_estimator = PyTorch( entry_point='your_training_script.py', # Specify role=role, instance_type='ml.p3.16xlarge', sagemaker_session=sagemaker_session, framework_version='1.13.1', py_version='py36', instance_count=1, distribution={ "smdistributed": {"modelparallel": smp_options}, "mpi": mpi_options }, base_job_name="SMD-MP-demo", ) smp_estimator.fit('s3://my_bucket/my_training_data/')
Astuce
Pour trouver une liste complète de paramètres pour la distribution, consultez les paramètres de configuration pour le parallélisme de modèle
Adaptation du script d'entraînement PyTorch
L'exemple de script d'entraînement suivant montre comment adapter la bibliothèque de parallélisme de modèles SageMaker à un script d'entraînement. Dans cet exemple, on suppose que le script est nommé your_training_script.py.
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 32, 3, 1) self.conv2 = nn.Conv2d(32, 64, 3, 1) self.fc1 = nn.Linear(9216, 128) self.fc2 = nn.Linear(128, 10) def forward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.relu(x) x = F.max_pool2d(x, 2) x = torch.flatten(x, 1) x = self.fc1(x) x = F.relu(x) x = self.fc2(x) return F.log_softmax(x, 1) def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by # the current process, based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target, reduction="mean") loss.backward() optimizer.step() # smdistributed: Initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time if smp.local_rank() == 0: dataset = datasets.MNIST("../data", train=True, download=False) smp.barrier() # smdistributed: Shard the dataset based on data parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") train_loader = torch.utils.data.DataLoader(dataset, batch_size=64) # smdistributed: Enable tensor parallelism for all supported modules in the model # i.e., nn.Linear in this case. Alternatively, we can use # smp.set_tensor_parallelism(model.fc1, True) # to enable it only for model.fc1 with smp.tensor_parallelism(): model = Net() # smdistributed: Use the DistributedModel wrapper to distribute the # modules for which tensor parallelism is enabled model = smp.DistributedModel(model) optimizer = optim.AdaDelta(model.parameters(), lr=4.0) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)
Parallélisme de tenseur associé au parallélisme de pipeline
Voici un exemple d'option d'entraînement distribué qui active le parallélisme de tenseur combiné au parallélisme de pipeline. Configurez les paramètres mpi_options et smp_options pour spécifier les options de parallélisme de modèles avec un parallélisme de tenseur lorsque vous configurez un estimateur PyTorch SageMaker.
Note
Des fonctions d'économie de mémoire étendues sont disponibles via Deep Learning Containers pour PyTorch, qui implémente la bibliothèque de parallélisme de modèles SageMaker version 1.6.0 ou ultérieure.
Configuration d'un estimateur PyTorch SageMaker
mpi_options = { "enabled" : True, "processes_per_host" : 8, # 8 processes "custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none " } smp_options = { "enabled":True, "parameters": { "microbatches": 4,"pipeline_parallel_degree": 2, # alias for "partitions" "placement_strategy": "cluster","tensor_parallel_degree": 2, # tp over 2 devices "ddp": True } } smp_estimator = PyTorch( entry_point='your_training_script.py', # Specify role=role, instance_type='ml.p3.16xlarge', sagemaker_session=sagemaker_session, framework_version='1.13.1', py_version='py36', instance_count=1, distribution={ "smdistributed": {"modelparallel": smp_options}, "mpi": mpi_options }, base_job_name="SMD-MP-demo", ) smp_estimator.fit('s3://my_bucket/my_training_data/')
Adaptation du script d'entraînement PyTorch
L'exemple de script d'entraînement suivant montre comment adapter la bibliothèque de parallélisme de modèles SageMaker à un script d'entraînement. Notez que le script d'entraînement inclut désormais le décorateur smp.step :
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 32, 3, 1) self.conv2 = nn.Conv2d(32, 64, 3, 1) self.fc1 = nn.Linear(9216, 128) self.fc2 = nn.Linear(128, 10) def forward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.relu(x) x = F.max_pool2d(x, 2) x = torch.flatten(x, 1) x = self.fc1(x) x = F.relu(x) x = self.fc2(x) return F.log_softmax(x, 1) # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target): output = model(data) loss = F.nll_loss(output, target, reduction="mean") model.backward(loss) return output, loss def train(model, device, train_loader, optimizer): model.train() for batch_idx, (data, target) in enumerate(train_loader): # smdistributed: Move input tensors to the GPU ID used by # the current process, based on the set_device call. data, target = data.to(device), target.to(device) optimizer.zero_grad() # Return value, loss_mb is a StepOutput object _, loss_mb = train_step(model, data, target) # smdistributed: Average the loss across microbatches. loss = loss_mb.reduce_mean() optimizer.step() # smdistributed: Initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time if smp.local_rank() == 0: dataset = datasets.MNIST("../data", train=True, download=False) smp.barrier() # smdistributed: Shard the dataset based on data parallel ranks if smp.dp_size() > 1: partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())} dataset = SplitDataset(dataset, partitions=partitions_dict) dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = Net() # smdistributed: enable tensor parallelism only for model.fc1 smp.set_tensor_parallelism(model.fc1, True) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = optim.AdaDelta(model.parameters(), lr=4.0) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)