

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Création d’un pipeline avec des fonctions décorées avec `@step`
<a name="pipelines-step-decorator-create-pipeline"></a>

Vous pouvez créer un pipeline en convertissant les fonctions Python en étapes de pipeline à l’aide du décorateur `@step`, en créant des dépendances entre ces fonctions pour créer un graphe de pipeline (ou graphe orienté acyclique (DAG)) et en transmettant les nœuds terminaux de ce graphe sous forme de liste d’étapes au pipeline. Les sections suivantes expliquent cette procédure en détail avec des exemples.

**Topics**
+ [Conversion d’une fonction en étape](#pipelines-step-decorator-run-pipeline-convert)
+ [Création de dépendances entre les étapes](#pipelines-step-decorator-run-pipeline-link)
+ [Utilisation de `ConditionStep` avec des étapes décorées avec `@step`](#pipelines-step-decorator-condition)
+ [Définition d’un pipeline à l’aide de la sortie `DelayedReturn` des étapes](#pipelines-step-define-delayed)
+ [Création d’un pipeline](#pipelines-step-decorator-pipeline-create)

## Conversion d’une fonction en étape
<a name="pipelines-step-decorator-run-pipeline-convert"></a>

Pour créer une étape à l’aide du décorateur `@step`, annotez la fonction avec `@step`. L’exemple suivant illustre une fonction décorée avec `@step` qui prétraite les données.

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
    
step_process_result = preprocess(raw_data)
```

Lorsque vous invoquez une fonction `@step` décorée, SageMaker AI renvoie une `DelayedReturn` instance au lieu d'exécuter la fonction. Une instance `DelayedReturn` est un proxy pour le retour réel de cette fonction. L’instance `DelayedReturn` peut être transmise à une autre fonction en tant qu’argument ou directement à une instance de pipeline en tant qu’étape. Pour plus d'informations sur la `DelayedReturn` classe, consultez [sagemaker.workflow.function\$1step. DelayedReturn](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.DelayedReturn).

## Création de dépendances entre les étapes
<a name="pipelines-step-decorator-run-pipeline-link"></a>

Lorsque vous créez une dépendance entre deux étapes, vous créez une connexion entre les étapes de votre graphe de pipeline. Les sections suivantes présentent plusieurs manières de créer une dépendance entre les étapes de votre pipeline.

### Dépendances des données via des arguments d’entrée
<a name="pipelines-step-decorator-run-pipeline-link-interstep"></a>

Le fait de transmettre la sortie `DelayedReturn` d’une fonction en entrée d’une autre fonction crée automatiquement une dépendance de données dans le graphique DAG du pipeline. Dans l’exemple suivant, la transmission de la sortie `DelayedReturn` de la fonction `preprocess` à la fonction `train` crée une dépendance entre `preprocess` et `train`.

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe

@step
def train(training_data):
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train(step_process_result)
```

L’exemple précédent définit une fonction d’entraînement qui est décorée avec `@step`. Lorsque cette fonction est invoquée, elle reçoit la sortie `DelayedReturn` de l’étape de pipeline de prétraitement en entrée. L’invocation de la fonction d’entraînement renvoie une autre instance `DelayedReturn`. Cette instance contient les informations sur toutes les étapes précédentes, définies dans cette fonction (c’est-à-dire l’étape `preprocess` de cet exemple), qui forment le graphique DAG du pipeline.

Dans l’exemple précédent, la fonction `preprocess` renvoie une valeur unique. Pour des types de retour plus complexes tels que des listes ou des tuples, reportez-vous à [Limitations](pipelines-step-decorator-limit.md).

### Définition de dépendances personnalisées
<a name="pipelines-step-decorator-run-pipeline-link-custom"></a>

Dans l’exemple précédent, la fonction `train` a reçu la sortie `DelayedReturn` de `preprocess` et a créé une dépendance. Si vous souhaitez définir cette dépendance de manière explicite sans transmettre la sortie de l’étape précédente, utilisez la fonction `add_depends_on` avec l’étape. Vous pouvez utiliser la fonction `get_step()` pour extraire l’étape sous-jacente depuis son instance `DelayedReturn`, puis appeler `add_depends_on` avec la dépendance en entrée. Pour visualiser la définition de la fonction `get_step()`, consultez [sagemaker.workflow.step\$1outputs.get\$1step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_outputs.get_step). L’exemple suivant montre comment créer une dépendance entre `preprocess` et `train` en utilisant `get_step()` et `add_depends_on()`.

```
from sagemaker.workflow.step_outputs import get_step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    processed_data = ..
    return s3.upload(processed_data)

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train()

get_step(step_train_result).add_depends_on([step_process_result])
```

### Transmission de données depuis et vers une fonction décorée avec `@step` vers une étape de pipeline traditionnelle
<a name="pipelines-step-decorator-run-pipeline-link-pass"></a>

Vous pouvez créer un pipeline qui inclut une étape décorée avec `@step` et une étape de pipeline traditionnelle et qui transmet des données entre les deux. Par exemple, vous pouvez utiliser `ProcessingStep` pour traiter les données et transmettre le résultat à la fonction d’entraînement décorée avec `@step`. Dans l’exemple suivant, une étape d’entraînement décorée avec `@step` fait référence à la sortie d’une étape de traitement.

```
# Define processing step

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role='arn:aws:iam::123456789012:role/SagemakerExecutionRole',
    instance_type='ml.m5.large',
    instance_count='1',
)

inputs = [
    ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
]
outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

process_step = ProcessingStep(
    name="MyProcessStep",
    step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'),
)
```

```
# Define a @step-decorated train step which references the 
# output of a processing step

@step
def train(train_data_path, test_data_path):
    ...
    return trained_model
    
step_train_result = train(
   process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
   process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
)
```

## Utilisation de `ConditionStep` avec des étapes décorées avec `@step`
<a name="pipelines-step-decorator-condition"></a>

Pipelines prend en charge une classe `ConditionStep` qui évalue les résultats des étapes précédentes pour décider de l’action à entreprendre dans le pipeline. Vous pouvez également utiliser `ConditionStep` avec une étape décorée avec `@step`. Pour utiliser la sortie d’une étape `@step` quelconque décorée avec `ConditionStep`, entrez la sortie de cette étape en tant qu’argument de `ConditionStep`. Dans l’exemple suivant, l’étape de condition reçoit la sortie de l’étape d’évaluation des modèles décorée avec `@step`.

```
# Define steps

@step(name="evaluate")
def evaluate_model():
    # code to evaluate the model
    return {
        "rmse":rmse_value
    }
    
@step(name="register")
def register_model():
    # code to register the model
    ...
```

```
# Define ConditionStep

from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep

conditionally_register = ConditionStep(
    name="conditional_register",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            # Output of the evaluate step must be json serializable
            left=evaluate_model()["rmse"],  # 
            right=5,
        )
    ],
    if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
    else_steps=[register_model()],
)
```

## Définition d’un pipeline à l’aide de la sortie `DelayedReturn` des étapes
<a name="pipelines-step-define-delayed"></a>

Vous définissez un pipeline de la même manière, que vous utilisiez ou non un décorateur `@step`. Lorsque vous transmettez une instance `DelayedReturn` à votre pipeline, vous n’avez pas besoin de transmettre la liste complète des étapes pour générer le pipeline. Le kit SDK déduit automatiquement les étapes précédentes en fonction des dépendances que vous définissez. Toutes les étapes précédentes des objets `Step` que vous avez transmis au pipeline ou aux objets `DelayedReturn` sont incluses dans le graphique du pipeline. Dans l’exemple suivant, le pipeline reçoit l’objet `DelayedReturn` pour la fonction `train`. SageMaker L'IA ajoute l'`preprocess`étape, en tant qu'étape précédente de`train`, au graphe du pipeline.

```
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=<sagemaker-session>,
)
```

S’il n’existe aucune donnée ou dépendance personnalisée entre les étapes et que vous exécutez plusieurs étapes en parallèle, le graphe du pipeline comporte plusieurs nœuds terminaux. Transmettez tous ces nœuds terminaux dans une liste à l’argument `steps` de votre définition de pipeline, comme indiqué dans l’exemple suivant :

```
@step
def process1():
    ...
    return data
    
@step
def process2():
   ...
   return data
   
step_process1_result = process1()
step_process2_result = process2()

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_process1_result, step_process2_result],
    sagemaker_session=sagemaker-session,
)
```

Lorsque le pipeline s’exécute, les deux étapes s’exécutent en parallèle.

Vous transmettez uniquement les nœuds terminaux du graphe au pipeline, car ils contiennent des informations sur toutes les étapes précédentes définies par le biais de dépendances de données ou personnalisées. Lorsqu'elle compile le pipeline, l' SageMaker IA déduit également toutes les étapes suivantes qui forment le graphe du pipeline et ajoute chacune d'elles en tant qu'étape distincte au pipeline.

## Création d’un pipeline
<a name="pipelines-step-decorator-pipeline-create"></a>

Créez un pipeline en appelant `pipeline.create()`, comme illustré dans l’extrait suivant. Pour plus de détails sur `create()`, consultez [sagemaker.workflow.pipeline.Pipeline.create](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.create).

```
role = "pipeline-role"
pipeline.create(role)
```

Lorsque vous appelez`pipeline.create()`, SageMaker AI compile toutes les étapes définies dans le cadre de l'instance de pipeline. SageMaker L'IA télécharge la fonction sérialisée, les arguments et tous les autres artefacts liés aux étapes sur Amazon S3.

Les données résident dans le compartiment S3 selon la structure suivante :

```
s3_root_uri/
    pipeline_name/
        sm_rf_user_ws/
            workspace.zip  # archive of the current working directory (workdir)
        step_name/
            timestamp/
                arguments/                # serialized function arguments
                function/                 # serialized function
                pre_train_dependencies/   # any dependencies and pre_execution scripts provided for the step       
        execution_id/
            step_name/
                results     # returned output from the serialized function including the model
```

`s3_root_uri`est défini dans le fichier de configuration SageMaker AI et s'applique à l'ensemble du pipeline. S'il n'est pas défini, le bucket SageMaker AI par défaut est utilisé.

**Note**  
Chaque fois que l' SageMaker IA compile un pipeline, elle SageMaker enregistre les fonctions sérialisées, les arguments et les dépendances des étapes dans un dossier horodaté avec l'heure actuelle. Cela se produit chaque fois que vous exécutez `pipeline.create()`, `pipeline.update()`, `pipeline.upsert()` ou `pipeline.definition()`.