Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Création d’un pipeline avec des fonctions décorées avec @step
Vous pouvez créer un pipeline en convertissant les fonctions Python en étapes de pipeline à l’aide du décorateur @step, en créant des dépendances entre ces fonctions pour créer un graphe de pipeline (ou graphe orienté acyclique (DAG)) et en transmettant les nœuds terminaux de ce graphe sous forme de liste d’étapes au pipeline. Les sections suivantes expliquent cette procédure en détail avec des exemples.
Rubriques
Conversion d’une fonction en étape
Pour créer une étape à l’aide du décorateur @step, annotez la fonction avec @step. L’exemple suivant illustre une fonction décorée avec @step qui prétraite les données.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
Lorsque vous invoquez une fonction @step décorée, SageMaker AI renvoie une DelayedReturn instance au lieu d'exécuter la fonction. Une instance DelayedReturn est un proxy pour le retour réel de cette fonction. L’instance DelayedReturn peut être transmise à une autre fonction en tant qu’argument ou directement à une instance de pipeline en tant qu’étape. Pour plus d'informations sur la DelayedReturn classe, consultez sagemaker.workflow.function_step. DelayedReturn
Création de dépendances entre les étapes
Lorsque vous créez une dépendance entre deux étapes, vous créez une connexion entre les étapes de votre graphe de pipeline. Les sections suivantes présentent plusieurs manières de créer une dépendance entre les étapes de votre pipeline.
Dépendances des données via des arguments d’entrée
Le fait de transmettre la sortie DelayedReturn d’une fonction en entrée d’une autre fonction crée automatiquement une dépendance de données dans le graphique DAG du pipeline. Dans l’exemple suivant, la transmission de la sortie DelayedReturn de la fonction preprocess à la fonction train crée une dépendance entre preprocess et train.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
L’exemple précédent définit une fonction d’entraînement qui est décorée avec @step. Lorsque cette fonction est invoquée, elle reçoit la sortie DelayedReturn de l’étape de pipeline de prétraitement en entrée. L’invocation de la fonction d’entraînement renvoie une autre instance DelayedReturn. Cette instance contient les informations sur toutes les étapes précédentes, définies dans cette fonction (c’est-à-dire l’étape preprocess de cet exemple), qui forment le graphique DAG du pipeline.
Dans l’exemple précédent, la fonction preprocess renvoie une valeur unique. Pour des types de retour plus complexes tels que des listes ou des tuples, reportez-vous à Limitations.
Définition de dépendances personnalisées
Dans l’exemple précédent, la fonction train a reçu la sortie DelayedReturn de preprocess et a créé une dépendance. Si vous souhaitez définir cette dépendance de manière explicite sans transmettre la sortie de l’étape précédente, utilisez la fonction add_depends_on avec l’étape. Vous pouvez utiliser la fonction get_step() pour extraire l’étape sous-jacente depuis son instance DelayedReturn, puis appeler add_depends_on avec la dépendance en entrée. Pour visualiser la définition de la fonction get_step(), consultez sagemaker.workflow.step_outputs.get_steppreprocess et train en utilisant get_step() et add_depends_on().
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
Transmission de données depuis et vers une fonction décorée avec @step vers une étape de pipeline traditionnelle
Vous pouvez créer un pipeline qui inclut une étape décorée avec @step et une étape de pipeline traditionnelle et qui transmet des données entre les deux. Par exemple, vous pouvez utiliser ProcessingStep pour traiter les données et transmettre le résultat à la fonction d’entraînement décorée avec @step. Dans l’exemple suivant, une étape d’entraînement décorée avec @step fait référence à la sortie d’une étape de traitement.
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a@step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
Utilisation de ConditionStep avec des étapes décorées avec @step
Pipelines prend en charge une classe ConditionStep qui évalue les résultats des étapes précédentes pour décider de l’action à entreprendre dans le pipeline. Vous pouvez également utiliser ConditionStep avec une étape décorée avec @step. Pour utiliser la sortie d’une étape @step quelconque décorée avec ConditionStep, entrez la sortie de cette étape en tant qu’argument de ConditionStep. Dans l’exemple suivant, l’étape de condition reçoit la sortie de l’étape d’évaluation des modèles décorée avec @step.
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
Définition d’un pipeline à l’aide de la sortie DelayedReturn des étapes
Vous définissez un pipeline de la même manière, que vous utilisiez ou non un décorateur @step. Lorsque vous transmettez une instance DelayedReturn à votre pipeline, vous n’avez pas besoin de transmettre la liste complète des étapes pour générer le pipeline. Le kit SDK déduit automatiquement les étapes précédentes en fonction des dépendances que vous définissez. Toutes les étapes précédentes des objets Step que vous avez transmis au pipeline ou aux objets DelayedReturn sont incluses dans le graphique du pipeline. Dans l’exemple suivant, le pipeline reçoit l’objet DelayedReturn pour la fonction train. SageMaker L'IA ajoute l'preprocessétape, en tant qu'étape précédente detrain, au graphe du pipeline.
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )
S’il n’existe aucune donnée ou dépendance personnalisée entre les étapes et que vous exécutez plusieurs étapes en parallèle, le graphe du pipeline comporte plusieurs nœuds terminaux. Transmettez tous ces nœuds terminaux dans une liste à l’argument steps de votre définition de pipeline, comme indiqué dans l’exemple suivant :
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )
Lorsque le pipeline s’exécute, les deux étapes s’exécutent en parallèle.
Vous transmettez uniquement les nœuds terminaux du graphe au pipeline, car ils contiennent des informations sur toutes les étapes précédentes définies par le biais de dépendances de données ou personnalisées. Lorsqu'elle compile le pipeline, l' SageMaker IA déduit également toutes les étapes suivantes qui forment le graphe du pipeline et ajoute chacune d'elles en tant qu'étape distincte au pipeline.
Crée un pipeline.
Créez un pipeline en appelant pipeline.create(), comme illustré dans l’extrait suivant. Pour plus de détails sur create(), consultez sagemaker.workflow.pipeline.Pipeline.create
role = "pipeline-role" pipeline.create(role)
Lorsque vous appelezpipeline.create(), SageMaker AI compile toutes les étapes définies dans le cadre de l'instance de pipeline. SageMaker L'IA télécharge la fonction sérialisée, les arguments et tous les autres artefacts liés aux étapes sur Amazon S3.
Les données résident dans le compartiment S3 selon la structure suivante :
s3_root_uri/pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name/timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id/step_name/ results # returned output from the serialized function including the model
s3_root_uriest défini dans le fichier de configuration SageMaker AI et s'applique à l'ensemble du pipeline. S'il n'est pas défini, le bucket SageMaker AI par défaut est utilisé.
Note
Chaque fois que l' SageMaker IA compile un pipeline, elle SageMaker enregistre les fonctions sérialisées, les arguments et les dépendances des étapes dans un dossier horodaté avec l'heure actuelle. Cela se produit chaque fois que vous exécutez pipeline.create(), pipeline.update(), pipeline.upsert() ou pipeline.definition().