Exemples de sources de données personnalisées - Amazon SageMaker AI

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exemples de sources de données personnalisées

Cette section fournit des exemples d'implémentations de sources de données personnalisées pour les intégrateurs de fonctionnalités. Pour plus d'informations sur les sources de données personnalisées, consultez Sources de données personnalisées.

La sécurité est une responsabilité partagée entre AWS et nos clients. AWSest chargé de protéger l'infrastructure qui gère les services dans leAWS Cloud. Les clients sont responsables de toutes leurs tâches de configuration et de gestion de sécurité nécessaires. Par exemple, des secrets tels que les informations d'identification d'accès aux magasins de données ne doivent pas être codés en dur dans vos sources de données personnalisées. Vous pouvez les utiliser AWS Secrets Manager pour gérer ces informations d'identification. Pour plus d'informations sur Secrets Manager, consultez Qu'est-ce que c'est AWS Secrets Manager ? dans le guide de AWS Secrets Manager l'utilisateur. Les exemples suivants utilisent Secrets Manager pour vos informations d’identification.

Exemples de source de données personnalisée de clusters Amazon Redshift (JDBC)

Amazon Redshift propose un pilote JDBC qui peut être utilisé pour lire des données avec Spark. Pour obtenir des informations sur le téléchargement du pilote JDBC Amazon Redshift, consultez Téléchargement du pilote JDBC Amazon Redshift version 2.1.

Pour créer la classe de sources de données Amazon Redshift personnalisée, vous devez remplacer la méthode read_data à partir des Sources de données personnalisées.

Pour vous connecter à un cluster Amazon Redshift, vous avez besoin des éléments suivants :

  • URL JDBC Amazon Redshift (jdbc-url)

    Pour obtenir des informations sur l'obtention de votre URL JDBC Amazon Redshift, consultez Obtention de l'URL JDBC dans le Guide du développeur de base de données Amazon Redshift.

  • Nom d'utilisateur (redshift-user) et mot de passe (redshift-password) Amazon Redshift

    Pour obtenir des informations sur la manière de créer et de gérer des utilisateurs de base de données à l'aide des commandes SQL Amazon Redshift, consultez Utilisateurs dans le Guide du développeur de base de données Amazon Redshift.

  • Nom de table Amazon Redshift (redshift-table-name)

    Pour obtenir des informations sur la manière de créer une table à partir de quelques exemples, consultez CREATE TABLE dans le Guide du développeur de base de données Amazon Redshift.

  • (Facultatif) Si vous utilisez Secrets Manager, vous avez besoin du nom du secret (secret-redshift-account-info) dans lequel vous stockez votre nom d'utilisateur et votre mot de passe d'accès à Amazon Redshift dans Secrets Manager.

    Pour plus d'informations sur Secrets Manager, consultez la section Rechercher des secrets AWS Secrets Manager dans le Guide de AWS Secrets Manager l'utilisateur.

  • Région AWS(your-region)

    Pour en savoir plus sur l'obtention du nom de région de votre session en cours à l'aide du kit SDK pour Python (Boto3), consultez region_name dans la documentation de Boto3.

L'exemple suivant montre comment récupérer l'URL JDBC et le jeton d'accès personnel depuis Secrets Manager et comment remplacer read_data pour votre classe de sources de données personnalisée, DatabricksDataSource.

from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class RedshiftDataSource(PySparkDataSource): data_source_name = "Redshift" data_source_unique_id = "redshift-resource-arn" def read_data(self, spark, params): url = "jdbc-url?user=redshift-user&password=redshift-password" aws_iam_role_arn = "redshift-command-access-role" secret_name = "secret-redshift-account-info" region_name = "your-region" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password']) return spark.read \ .format("jdbc") \ .option("url", url) \ .option("driver", "com.amazon.redshift.Driver") \ .option("dbtable", "redshift-table-name") \ .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \ .option("aws_iam_role", aws_iam_role_arn) \ .load()

L'exemple suivant montre comment connecter RedshiftDataSource à votre décorateur feature_processor.

from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[RedshiftDataSource()], output="feature-group-arn", target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"} ) def transform(input_df): return input_df

Pour exécuter la tâche de l’intégrateur de caractéristiques à distance, vous devez fournir le pilote jdbc en définissant SparkConfig et le transmettre au décorateur @remote.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[RedshiftDataSource()], output="feature-group-arn", target_stores=["OfflineStore"], ) def transform(input_df): return input_df

Exemples de sources de données personnalisées Snowflake

Snowflake fournit un connecteur Spark qui peut être utilisé pour votre décorateur feature_processor. Pour obtenir des informations sur le connecteur Snowflake pour Spark, consultez Connecteur Snowflake pour Spark dans la documentation Snowflake.

Pour créer la classe de sources de données Snowflake personnalisée, vous devez remplacer la méthode read_data à partir des Sources de données personnalisées et ajouter les packages du connecteur Spark au chemin de classe Spark.

Pour vous connecter à une source de données Snowflake, vous avez besoin des éléments suivants :

  • URL Snowflake (sf-url)

    URLs Pour plus d'informations sur l'accès aux interfaces Web de Snowflake, consultez la section Identifiants de compte dans la documentation de Snowflake.

  • Base de données Snowflake (sf-database)

    Pour obtenir des informations sur l'obtention du nom de votre base de données à l'aide de Snowflake, consultez CURRENT_DATABASE dans la documentation de Snowflake.

  • Schéma de base de données Snowflake (sf-schema)

    Pour en savoir plus sur l'obtention du nom de votre schéma à l'aide de Snowflake, consultez CURRENT_SCHEMA dans la documentation de Snowflake.

  • Entrepôt Snowflake (sf-warehouse)

    Pour obtenir des informations sur l'obtention du nom de votre entrepôt à l'aide de Snowflake, consultez CURRENT_WAREHOUSE dans la documentation de Snowflake.

  • Nom de table Snowflake (sf-table-name)

  • (Facultatif) Si vous utilisez Secrets Manager, vous avez besoin du nom du secret (secret-snowflake-account-info) dans lequel vous stockez votre nom d'utilisateur et votre mot de passe d'accès à Snowflake dans Secrets Manager.

    Pour plus d'informations sur Secrets Manager, consultez la section Rechercher des secrets AWS Secrets Manager dans le Guide de AWS Secrets Manager l'utilisateur.

  • Région AWS(your-region)

    Pour en savoir plus sur l'obtention du nom de région de votre session en cours à l'aide du kit SDK pour Python (Boto3), consultez region_name dans la documentation de Boto3.

L'exemple suivant montre comment récupérer le nom d'utilisateur et le mot de passe Snowflake depuis Secrets Manager et comment remplacer la fonction read_data pour votre classe de sources de données personnalisée SnowflakeDataSource.

from sagemaker.feature_store.feature_processor import PySparkDataSource from sagemaker.feature_store.feature_processor import feature_processor import json import boto3 class SnowflakeDataSource(PySparkDataSource): sf_options = { "sfUrl" : "sf-url", "sfDatabase" : "sf-database", "sfSchema" : "sf-schema", "sfWarehouse" : "sf-warehouse", } data_source_name = "Snowflake" data_source_unique_id = "sf-url" def read_data(self, spark, params): secret_name = "secret-snowflake-account-info" region_name = "your-region" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) self.sf_options["sfUser"] = secrets.get("username") self.sf_options["sfPassword"] = secrets.get("password") return spark.read.format("net.snowflake.spark.snowflake") \ .options(**self.sf_options) \ .option("dbtable", "sf-table-name") \ .load()

L'exemple suivant montre comment connecter SnowflakeDataSource à votre décorateur feature_processor.

from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[SnowflakeDataSource()], output=feature-group-arn, target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"} ) def transform(input_df): return input_df

Pour exécuter la tâche de l'intégrateur de fonctionnalités à distance, vous devez fournir les packages en définissant SparkConfig et les transmettre au décorateur @remote. Dans l’exemple suivant, les packages Spark sont tels que spark-snowflake_2.12 correspond à la version Scala de l’intégrateur de caractéristiques, 2.12.0 à la version de Snowflake que vous souhaitez utiliser et spark_3.3 à la version Spark de l’intégrateur de caractéristiques.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[SnowflakeDataSource()], output="feature-group-arn>", target_stores=["OfflineStore"], ) def transform(input_df): return input_df

Exemples de sources de données personnalisées Databricks (JDBC)

Spark peut lire les données de Databricks à l'aide du pilote JDBC Databricks. Pour obtenir des informations sur le pilote JDBC Databricks, consultez Configuration des pilotes ODBC et JDBC Databricks (langue française non garantie) dans la documentation de Databricks.

Note

Vous pouvez lire les données de n'importe quelle autre base de données en incluant le pilote JDBC correspondant dans le chemin de classe Spark. Pour plus d'informations, consultez JDBC vers d'autres bases de données (langue française non garantie) dans le Guide de Spark SQL.

Pour créer la classe de sources de données Databricks personnalisée, vous devez remplacer la méthode read_data à partir des Sources de données personnalisées et ajouter le fichier JAR JDBC au chemin de classe Spark.

Pour vous connecter à une source de données Databricks, vous avez besoin des éléments suivants :

  • URL Databricks (databricks-url)

    Pour obtenir des informations sur votre URL Databricks, consultez Création de l'URL de connexion pour le pilote Databricks (langue française non garantie) dans la documentation de Databricks.

  • Jeton d'accès personnel Databricks (personal-access-token)

    Pour obtenir des informations sur votre jeton d'accès Databricks, consultez Authentification par jeton d'accès personnel Databricks (langue française non garantie) dans la documentation de Databricks.

  • Nom de catalogue de données (db-catalog)

    Pour obtenir des informations sur le nom de votre catalogue Databricks, consultez Nom de catalogue (langue française non garantie) dans la documentation de Databricks.

  • Nom de schéma (db-schema)

    Pour obtenir des informations sur le nom de votre schéma Databricks, consultez Nom de schéma (langue française non garantie) dans la documentation de Databricks.

  • Nom de table (db-table-name)

    Pour obtenir des informations sur le nom de votre table Databricks, consultez Nom de table (langue française non garantie) dans la documentation de Databricks.

  • (Facultatif) Si vous utilisez Secrets Manager, vous avez besoin du nom du secret (secret-databricks-account-info) dans lequel vous stockez votre nom d'utilisateur et votre mot de passe d'accès à Databricks dans Secrets Manager.

    Pour plus d'informations sur Secrets Manager, consultez la section Rechercher des secrets AWS Secrets Manager dans le Guide de AWS Secrets Manager l'utilisateur.

  • Région AWS(your-region)

    Pour en savoir plus sur l'obtention du nom de région de votre session en cours à l'aide du kit SDK pour Python (Boto3), consultez region_name dans la documentation de Boto3.

L'exemple suivant montre comment récupérer l'URL JDBC et le jeton d'accès personnel depuis Secrets Manager et comment remplacer read_data pour votre classe de sources de données personnalisée DatabricksDataSource.

from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class DatabricksDataSource(PySparkDataSource): data_source_name = "Databricks" data_source_unique_id = "databricks-url" def read_data(self, spark, params): secret_name = "secret-databricks-account-info" region_name = "your-region" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd']) return spark.read.format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \ .option("driver", "com.simba.spark.jdbc.Driver") \ .load()

L'exemple suivant montre comment charger le fichier JAR de pilote JDBC, jdbc-jar-file-name.jar, sur Amazon S3 afin de l'ajouter au chemin de classe Spark. Pour obtenir des informations sur le téléchargement du pilote JDBC Spark (jdbc-jar-file-name.jar) depuis Databricks, consultez Téléchargement du pilote JDBC (langue française non garantie) sur le site Web de Databricks.

from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[DatabricksDataSource()], output=feature-group-arn, target_stores=["OfflineStore"], spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"} ) def transform(input_df): return input_df

Pour exécuter la tâche de l’intégrateur de caractéristiques à distance, vous devez fournir les fichiers JAR en définissant SparkConfig et les transmettre au décorateur @remote.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[DatabricksDataSource()], output="feature-group-arn", target_stores=["OfflineStore"], ) def transform(input_df): return input_df

Exemples de sources de données personnalisées de streaming

Vous pouvez vous connecter à des sources de données de streaming telles qu’Amazon Kinesis, et créer des transformations avec Spark Structured Streaming pour lire à partir de sources de données de streaming. Pour plus d'informations sur le connecteur Kinesis, consultez la section Connecteur Kinesis pour Spark Structured Streaming in. GitHub Pour en savoir plus sur Amazon Kinesis, consultez Présentation d’Amazon Kinesis Data Streams dans le Guide du développeur Amazon Kinesis.

Pour créer la classe de sources de données Amazon Kinesis personnalisée, vous devez étendre la classe BaseDataSource et remplacer la méthode read_data à partir des Sources de données personnalisées.

Pour vous connecter à un flux de données Amazon Kinesis, vous avez besoin des éléments suivants :

from sagemaker.feature_store.feature_processor import BaseDataSource from sagemaker.feature_store.feature_processor import feature_processor class KinesisDataSource(BaseDataSource): data_source_name = "Kinesis" data_source_unique_id = "kinesis-resource-arn" def read_data(self, spark, params): return spark.readStream.format("kinesis") \ .option("streamName", "kinesis-stream-name") \ .option("awsUseInstanceProfile", "false") \ .option("endpointUrl", "https://kinesis.your-region.amazonaws.com") .load()

L’exemple suivant montre comment connecter KinesisDataSource à votre décorateur feature_processor.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig import feature_store_pyspark.FeatureStoreManager as fsm def ingest_micro_batch_into_fg(input_df, epoch_id): feature_group_arn = "feature-group-arn" fsm.FeatureStoreManager().ingest_data( input_data_frame = input_df, feature_group_arn = feature_group_arn ) @remote( spark_config=SparkConfig( configuration={ "Classification": "spark-defaults", "Properties":{ "spark.sql.streaming.schemaInference": "true", "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2" } } ), instance_type="ml.m5.2xlarge", max_runtime_in_seconds=2419200 # 28 days ) @feature_processor( inputs=[KinesisDataSource()], output="feature-group-arn" ) def transform(input_df): output_stream = ( input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)") .writeStream.foreachBatch(ingest_micro_batch_into_fg) .trigger(processingTime="1 minute") .option("checkpointLocation", "s3a://checkpoint-path") .start() ) output_stream.awaitTermination()

Dans l’exemple de code ci-dessus, nous utilisons quelques options de Spark Structured Streaming pour diffuser des micro-lots dans votre groupe de caractéristiques. Pour obtenir la liste complète des options, consultez Structured Streaming Programming Guide dans la documentation d’Apache Spark.

  • Le mode récepteur foreachBatch est une fonctionnalité qui vous permet d’appliquer des opérations et d’écrire la logique sur les données de sortie de chaque micro-lot d’une requête de streaming.

    Pour plus d'informationsforeachBatch, consultez la section Utilisation de Foreach et le guide ForeachBatch de programmation de streaming structuré d'Apache Spark.

  • L’option checkpointLocation enregistre régulièrement l’état de l’application de streaming. Le journal de streaming est enregistré à l’emplacement s3a://checkpoint-path du point de contrôle.

    Pour obtenir des informations sur l’option checkpointLocation, consultez Recovering from Failures with Checkpointing dans le Guide de programmation de streaming structuré d’Apache Spark.

  • Le paramètre trigger définit la fréquence à laquelle le traitement par micro-lots est déclenché dans une application de streaming. Dans cet exemple, le type de déclencheur du temps de traitement est utilisé avec des intervalles de micro-lots d’une minute, spécifiés par trigger(processingTime="1 minute"). Pour effectuer un remplissage à partir d’une source de flux, vous pouvez utiliser le type de déclencheur available-now, spécifié par trigger(availableNow=True).

    Pour obtenir la liste complète des types trigger, consultez Triggers dans le Guide de programmation de streaming structuré d’Apache Spark.

Streaming continu et nouvelles tentatives automatiques à l’aide de déclencheurs basés sur des événements

Le Feature Processor utilise la SageMaker formation comme infrastructure de calcul et sa durée d'exécution maximale est de 28 jours. Vous pouvez utiliser des déclencheurs basés sur des événements pour prolonger votre streaming continu sur une plus longue période et récupérer d’échecs temporaires. Pour plus d’informations sur les exécutions basées sur la planification et les événements, consultez Exécutions planifiées et basées sur des événements pour les pipelines d’intégrateur de caractéristiques.

Voici un exemple de configuration d’un déclencheur basé sur les événements pour que le pipeline d’intégrateur de caractéristiques de streaming continue à fonctionner en continu. Cela utilise la fonction de transformation de streaming définie dans l’exemple précédent. Un pipeline cible peut être configuré pour être déclenché lorsqu’un événement STOPPED ou FAILED se produit pour l’exécution d’un pipeline source. Notez que le même pipeline est utilisé comme source et cible afin qu’il fonctionne en continu.

import sagemaker.feature_store.feature_processor as fp from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "streaming-pipeline" streaming_pipeline_arn = fp.to_pipeline( pipeline_name = streaming_pipeline_name, step = transform # defined in previous section ) fp.put_trigger( source_pipeline_events=FeatureProcessorPipelineEvents( pipeline_name=source_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ), target_pipeline=target_pipeline_name )