Esegui un processo di addestramento parallelo di un modello distribuito di SageMaker con il parallelismo tensoriale - Amazon SageMaker AI

Esegui un processo di addestramento parallelo di un modello distribuito di SageMaker con il parallelismo tensoriale

In questa sezione, imparerai:

  • Come configurare uno strumento di valutazione SageMaker PyTorch e l'opzione di parallelismo del modello SageMaker per utilizzare il parallelismo tensoriale.

  • Come adattare lo script di addestramento utilizzando i moduli smdistributed.modelparallel estesi per il parallelismo tensoriale.

Per saperne di più sui moduli smdistributed.modelparallel, consulta le API parallele del modello SageMaker nella documentazione dell'SDK Python di SageMaker.

Solo parallelismo tensoriale

Quello che segue è un esempio di un opzione di addestramento distribuito per attivare il solo parallelismo tensoriale, senza il parallelismo della pipeline. Configura i dizionari mpi_options e smp_options per specificare le opzioni di addestramento distribuito nello strumento di valutazione SageMaker PyTorch.

Nota

Le funzionalità di risparmio di memoria estese sono disponibili tramite i container di Deep Learning per PyTorch, che implementano la libreria di parallelismo dei modelli SageMaker v1.6.0 o versioni successive.

Configurare uno strumento di valutazione SageMaker PyTorch

mpi_options = { "enabled" : True, "processes_per_host" : 8,               # 8 processes "custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none " }                 smp_options = { "enabled":True, "parameters": { "pipeline_parallel_degree": 1,    # alias for "partitions" "placement_strategy": "cluster", "tensor_parallel_degree": 4,      # tp over 4 devices "ddp": True } }                smp_estimator = PyTorch(    entry_point='your_training_script.py', # Specify    role=role,    instance_type='ml.p3.16xlarge',    sagemaker_session=sagemaker_session,    framework_version='1.13.1', py_version='py36',    instance_count=1,    distribution={        "smdistributed": {"modelparallel": smp_options},        "mpi": mpi_options    },    base_job_name="SMD-MP-demo", ) smp_estimator.fit('s3://my_bucket/my_training_data/')
Suggerimento

Per trovare un elenco completo dei parametri per distribution, consulta Parametri di configurazione per il parallelismo dei modelli nella documentazione di SDK Python di SageMaker.

Adatta il tuo script di addestramento PyTorch

Il seguente esempio di script di addestramento mostra come adattare la libreria di parallelismo dei modelli SageMaker a uno script di addestramento. In questo esempio, si presume che lo script abbia il nome your_training_script.py.

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class Net(nn.Module):     def __init__(self):         super(Net, self).__init__()         self.conv1 = nn.Conv2d(1, 32, 3, 1)         self.conv2 = nn.Conv2d(32, 64, 3, 1)         self.fc1 = nn.Linear(9216, 128)         self.fc2 = nn.Linear(128, 10)     def forward(self, x):         x = self.conv1(x)         x = F.relu(x)         x = self.conv2(x)         x = F.relu(x)         x = F.max_pool2d(x, 2)         x = torch.flatten(x, 1)         x = self.fc1(x)         x = F.relu(x)         x = self.fc2(x)         return F.log_softmax(x, 1) def train(model, device, train_loader, optimizer):     model.train()     for batch_idx, (data, target) in enumerate(train_loader):         # smdistributed: Move input tensors to the GPU ID used by         # the current process, based on the set_device call.         data, target = data.to(device), target.to(device)         optimizer.zero_grad()         output = model(data)         loss = F.nll_loss(output, target, reduction="mean")         loss.backward()         optimizer.step() # smdistributed: Initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time if smp.local_rank() == 0:     dataset = datasets.MNIST("../data", train=True, download=False) smp.barrier() # smdistributed: Shard the dataset based on data parallel ranks if smp.dp_size() > 1:     partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}     dataset = SplitDataset(dataset, partitions=partitions_dict)     dataset.select(f"{smp.dp_rank()}") train_loader = torch.utils.data.DataLoader(dataset, batch_size=64) # smdistributed: Enable tensor parallelism for all supported modules in the model # i.e., nn.Linear in this case. Alternatively, we can use # smp.set_tensor_parallelism(model.fc1, True) # to enable it only for model.fc1 with smp.tensor_parallelism():     model = Net() # smdistributed: Use the DistributedModel wrapper to distribute the # modules for which tensor parallelism is enabled model = smp.DistributedModel(model) optimizer = optim.AdaDelta(model.parameters(), lr=4.0) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)

Parallelismo tensoriale combinato con parallelismo di pipeline

Di seguito è riportato un esempio di opzione di addestramento distribuito che abilita il parallelismo tensoriale combinato con il parallelismo della pipeline. Imposta i parametri mpi_options e smp_options per specificare le opzioni parallele del modello con parallelismo tensoriale quando configuri uno strumento di valutazione SageMaker PyTorch.

Nota

Le funzionalità di risparmio di memoria estese sono disponibili tramite i container di Deep Learning per PyTorch, che implementano la libreria di parallelismo dei modelli SageMaker v1.6.0 o versioni successive.

Configurare uno strumento di valutazione SageMaker PyTorch

mpi_options = { "enabled" : True, "processes_per_host" : 8,               # 8 processes "custom_mpi_options" : "--mca btl_vader_single_copy_mechanism none " }                 smp_options = { "enabled":True, "parameters": { "microbatches": 4, "pipeline_parallel_degree": 2,    # alias for "partitions" "placement_strategy": "cluster", "tensor_parallel_degree": 2,      # tp over 2 devices "ddp": True } }                smp_estimator = PyTorch(    entry_point='your_training_script.py', # Specify    role=role,    instance_type='ml.p3.16xlarge',    sagemaker_session=sagemaker_session,    framework_version='1.13.1', py_version='py36',    instance_count=1,    distribution={        "smdistributed": {"modelparallel": smp_options},        "mpi": mpi_options    },    base_job_name="SMD-MP-demo", ) smp_estimator.fit('s3://my_bucket/my_training_data/')  

Adatta il tuo script di addestramento PyTorch

Il seguente esempio di script di addestramento mostra come adattare la libreria di parallelismo dei modelli SageMaker a uno script di addestramento. Nota che lo script di addestramento ora include il decoratore smp.step:

import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchnet.dataset import SplitDataset from torchvision import datasets import smdistributed.modelparallel.torch as smp class Net(nn.Module):     def __init__(self):         super(Net, self).__init__()         self.conv1 = nn.Conv2d(1, 32, 3, 1)         self.conv2 = nn.Conv2d(32, 64, 3, 1)         self.fc1 = nn.Linear(9216, 128)         self.fc2 = nn.Linear(128, 10)     def forward(self, x):         x = self.conv1(x)         x = F.relu(x)         x = self.conv2(x)         x = F.relu(x)         x = F.max_pool2d(x, 2)         x = torch.flatten(x, 1)         x = self.fc1(x)         x = F.relu(x)         x = self.fc2(x)         return F.log_softmax(x, 1) # smdistributed: Define smp.step. Return any tensors needed outside. @smp.step def train_step(model, data, target):     output = model(data)     loss = F.nll_loss(output, target, reduction="mean")     model.backward(loss)     return output, loss def train(model, device, train_loader, optimizer):     model.train()     for batch_idx, (data, target) in enumerate(train_loader):         # smdistributed: Move input tensors to the GPU ID used by         # the current process, based on the set_device call.         data, target = data.to(device), target.to(device)         optimizer.zero_grad()         # Return value, loss_mb is a StepOutput object         _, loss_mb = train_step(model, data, target)         # smdistributed: Average the loss across microbatches.         loss = loss_mb.reduce_mean()         optimizer.step() # smdistributed: Initialize the backend smp.init() # smdistributed: Set the device to the GPU ID used by the current process. # Input tensors should be transferred to this device. torch.cuda.set_device(smp.local_rank()) device = torch.device("cuda") # smdistributed: Download only on a single process per instance. # When this is not present, the file is corrupted by multiple processes trying # to download and extract at the same time if smp.local_rank() == 0:     dataset = datasets.MNIST("../data", train=True, download=False) smp.barrier() # smdistributed: Shard the dataset based on data parallel ranks if smp.dp_size() > 1:     partitions_dict = {f"{i}": 1 / smp.dp_size() for i in range(smp.dp_size())}     dataset = SplitDataset(dataset, partitions=partitions_dict)     dataset.select(f"{smp.dp_rank()}") # smdistributed: Set drop_last=True to ensure that batch size is always divisible # by the number of microbatches train_loader = torch.utils.data.DataLoader(dataset, batch_size=64, drop_last=True) model = Net() # smdistributed: enable tensor parallelism only for model.fc1 smp.set_tensor_parallelism(model.fc1, True) # smdistributed: Use the DistributedModel container to provide the model # to be partitioned across different ranks. For the rest of the script, # the returned DistributedModel object should be used in place of # the model provided for DistributedModel class instantiation. model = smp.DistributedModel(model) optimizer = optim.AdaDelta(model.parameters(), lr=4.0) optimizer = smp.DistributedOptimizer(optimizer) train(model, device, train_loader, optimizer)