Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Creazione di una pipeline con funzioni decorate con @step
Puoi creare una pipeline convertendo le funzioni Python in fasi della pipeline con il decoratore @step, creando dipendenze tra tali funzioni per creare un grafo di pipeline (o un grafo aciclico orientato, DAG) e passando i nodi foglia del grafo come elenco di fasi alla pipeline. Le sezioni seguenti spiegano questa procedura in dettaglio con esempi.
Argomenti
Conversione di una funzione in una fase
Per creare una fase con il decoratore @step, annota la funzione con @step. L’esempio seguente mostra una funzione decorata con @step che pre-elabora i dati.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
Quando richiami una funzione @step -decorata, SageMaker AI restituisce un'DelayedReturnistanza invece di eseguire la funzione. Un’istanza DelayedReturn è un proxy per la restituzione effettiva della funzione. L’istanza DelayedReturn può essere passata a un’altra funzione come argomento o direttamente a un’istanza di pipeline come fase. Per informazioni sulla DelayedReturn classe, vedete sagemaker.workflow.function_step. DelayedReturn
Creazione di dipendenze tra le fasi
Quando crei una dipendenza tra due fasi, crei una connessione tra le fasi nel grafo della pipeline. Le sezioni seguenti introducono diversi modi per creare una dipendenza tra le fasi della pipeline.
Dipendenze dei dati tramite argomenti di input
Il passaggio nell’output DelayedReturn di una funzione come input in un’altra funzione crea automaticamente una dipendenza dei dati nel DAG della pipeline. Nell’esempio seguente, il passaggio dell’output DelayedReturn della funzione preprocess alla funzione train crea una dipendenza tra preprocess e train.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
L’esempio precedente definisce una funzione di addestramento decorata con @step. Quando questa funzione viene invocata, riceve come input l’output DelayedReturn della fase della pipeline di pre-elaborazione. L’invocazione della funzione di addestramento restituisce un’altra istanza DelayedReturn. Questa istanza contiene le informazioni su tutti le fasi precedenti definite in quella funzione (ad esempio, la fase preprocess in questo esempio) che formano il DAG della pipeline.
Nell’esempio precedente, la funzione preprocess restituisce un valore singolo. Per tipi di restituzione più complessi come elenchi o tuple, consulta Limitazioni
Definizione delle dipendenze personalizzate
Nell’esempio precedente, la funzione train ha ricevuto l’output DelayedReturn di preprocess e ha creato una dipendenza. Se desideri definire la dipendenza in modo esplicito senza passare l’output della fase precedente, utilizza la funzione add_depends_on con la fase. Puoi utilizzare la funzione get_step() per recuperare la fase sottostante dalla relativa istanza DelayedReturn e quindi chiamare add_depends_on con la dipendenza come input. Per visualizzare la definizione della funzione get_step(), consulta sagemaker.workflow.step_outputs.get_steppreprocess e train utilizzando get_step() eadd_depends_on().
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
Passaggio dei dati da e verso una funzione decorata con @step in una fase della pipeline tradizionale
Puoi creare una pipeline che include una fase decorata con @step e una fase della pipeline tradizionale e trasferire i dati tra le fasi. Ad esempio, puoi utilizzare per ProcessingStep elaborare i dati e passare il risultato alla funzione di addestramento decorata con @step. Nell’esempio seguente, una fase di addestramento decorata con @step fa riferimento all’output di una fase di elaborazione.
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a@step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
Utilizzo di ConditionStep con fasi decorate con @step
Pipelines supporta una classe ConditionStep che valuta i risultati delle fasi precedenti per decidere quali azioni intraprendere nella pipeline. Puoi utilizzare ConditionStep anche con una fase decorata con @step. Per utilizzare l’output di qualsiasi fase decorata con @step con ConditionStep, inserisci l’output della fase come argomento in ConditionStep. Nell’esempio seguente, la fase di condizione riceve l’output della fase di valutazione del modello decorata con @step.
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
Definizione di una pipeline con l’output DelayedReturn delle fasi
Definisci una pipeline allo stesso modo, indipendentemente dal fatto che utilizzi o meno un decoratore @step. Quando passi un’istanza DelayedReturn alla pipeline, non è necessario passare un elenco completo di fasi per creare la pipeline. L’SDK deduce automaticamente le fasi precedenti in base alle dipendenze definite. Tutte le fasi precedenti degli oggetti Step passati alla pipeline o agli oggetti DelayedReturn sono incluse nel grafo della pipeline. Nell’esempio seguente, la pipeline riceve l’oggetto DelayedReturn per la funzione train. SageMaker L'intelligenza artificiale aggiunge il preprocess passaggio, come passaggio precedente ditrain, al grafico della pipeline.
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )
Se non ci sono dati o dipendenze personalizzate tra le fasi e si eseguono più fasi in parallelo, il grafo della pipeline ha più di un nodo foglia. Passa tutti questi nodi foglia in un elenco all’argomento steps nella definizione della pipeline, come mostrato nell’esempio seguente:
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )
Quando la pipeline è in esecuzione, entrambi le fasi vengono eseguite in parallelo.
Passa solo i nodi foglia del grafico alla pipeline perché contengono informazioni su tutti le fasi precedenti definite tramite dipendenze dei dati o personalizzate. Quando compila la pipeline, l' SageMaker intelligenza artificiale deduce anche tutti i passaggi successivi che formano il grafico della pipeline e aggiunge ciascuno di essi come passaggio separato alla pipeline.
Crea una pipeline
Crea una pipeline chiamando pipeline.create(), come mostrato nel frammento seguente. Per ulteriori informazioni su create(), consulta sagemaker.workflow.pipeline.Pipeline.create
role = "pipeline-role" pipeline.create(role)
Quando chiamipipeline.create(), l' SageMaker IA compila tutti i passaggi definiti come parte dell'istanza della pipeline. SageMaker L'intelligenza artificiale carica la funzione serializzata, gli argomenti e tutti gli altri artefatti relativi ai passaggi su Amazon S3.
I dati risiedono nel bucket S3 in base alla seguente struttura:
s3_root_uri/pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name/timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id/step_name/ results # returned output from the serialized function including the model
s3_root_uriè definito nel file di configurazione SageMaker AI e si applica all'intera pipeline. Se non definito, viene utilizzato il bucket SageMaker AI predefinito.
Nota
Ogni volta che l' SageMaker intelligenza artificiale compila una pipeline, l' SageMaker intelligenza artificiale salva le funzioni, gli argomenti e le dipendenze serializzati dei passaggi in una cartella con l'ora corrente. Questo si verifica ogni volta che esegui pipeline.create(), pipeline.update(), pipeline.upsert() o pipeline.definition().