Esecuzione di processi ETL su Tabelle Amazon S3 con AWS Glue
AWS Glue è un servizio di integrazione dati serverless che semplifica agli utenti analitici il rilevamento, la preparazione, lo spostamento e l'integrazione di dati da più origini. Puoi utilizzare i processi di AWS Glue per eseguire pipeline di estrazione, trasformazione e caricamento (ETL) al fine di caricare dati nei data lake. Per ulteriori informazioni su AWS Glue, consulta Che cos'è AWS Glue? nella Guida per gli sviluppatori di AWS Glue.
Un processo AWS Glue incapsula uno script che si connette ai dati di origine, lo elabora e quindi lo scrive nella destinazione dati. Di solito un processo esegue script di estrazione, trasformazione e caricamento (ETL). I processi possono eseguire script progettati per gli ambienti di runtime di Apache Spark. È possibile monitorare le esecuzioni dei processi per comprendere i parametri di runtime come esito positivo, durata e ora di inizio.
Puoi utilizzare i processi di AWS Glue per elaborare i dati nelle tabelle S3 connettendoti alle tabelle stesse tramite l’integrazione con i servizi di analisi AWS oppure connetterti direttamente utilizzando l’endpoint Iceberg REST di Tabelle Amazon S3 o il catalogo di Tabelle Amazon S3 per Apache Iceberg. Questa guida illustra i passaggi di base per iniziare a utilizzare AWS Glue con Tabelle S3, tra cui:
Scegli il metodo di accesso in base ai requisiti specifici dei processi ETL di AWS Glue:
-
Integrazione dei servizi di analisi AWS (metodo consigliato): metodo consigliato quando è necessaria una gestione centralizzata dei metadati su più servizi di analisi AWS, quando desideri utilizzare le autorizzazioni esistenti del Catalogo Dati AWS Glue e il controllo granulare degli accessi con Lake Formation oppure quando stai creando pipeline ETL di produzione che si integrano con altri servizi AWS come Athena o Amazon EMR.
-
Endpoint Iceberg REST di Tabelle Amazon S3: metodo consigliato quando devi connetterti alle tabelle S3 da motori di query di terze parti che supportano Apache Iceberg, quando devi creare applicazioni ETL personalizzate che richiedono l’accesso diretto alla REST API oppure quando ti serve avere il controllo sulle operazioni del catalogo senza dipendere dal Catalogo Dati AWS Glue.
-
Catalogo di Tabelle Amazon S3 per Apache Iceberg: da utilizzare solo per applicazioni legacy o scenari di programmazione specifici che richiedono la libreria client Java. Questo metodo non è consigliato per le nuove implementazioni dei processi ETL di AWS Glue a causa della maggiore complessità e della necessità di gestire ulteriori dipendenze JAR.
Fase 1: prerequisiti
Prima di poter interrogare le tabelle da un processo di AWS Glue, è necessario configurare un ruolo IAM che AWS Glue possa utilizzare per eseguire il processo stesso. Scegli il metodo di accesso per visualizzare i prerequisiti specifici relativi a quel metodo.
- AWS analytics services integration (Recommended)
-
Prerequisiti necessari per utilizzare l’integrazione dell’analisi AWS di Tabelle Amazon S3 per eseguire i processi di AWS Glue.
- Amazon S3 Tables Iceberg REST endpoint
-
Prerequisiti per utilizzare l’endpoint Iceberg REST di Tabelle Amazon S3 per eseguire i processi ETL di AWS Glue.
- Amazon S3 Tables Catalog for Apache Iceberg
-
Prerequisiti per utilizzare il catalogo di Tabelle Amazon S3 per Apache Iceberg al fine di eseguire i processi ETL di AWS Glue.
Fase 2: creazione di uno script per il collegamento ai bucket della tabella
Per accedere ai dati della tabella quando esegui un processo ETL di AWS Glue, devi configurare una sessione Spark per Apache Iceberg che si connetta al bucket della tabella S3. Puoi modificare uno script esistente per collegarti al bucket della tabella o creare un nuovo script. Per ulteriori informazioni sulla creazione di script di AWS Glue, consulta Tutorial: come scrivere uno script di AWS Glue per Spark nella Guida per gli sviluppatori di AWS Glue.
Puoi configurare la sessione per connetterti ai bucket della tabella utilizzando uno dei seguenti metodi di accesso di Tabelle Amazon S3:
-
Integrazione dei servizi di analisi AWS di Tabelle Amazon S3 (metodo consigliato)
-
Endpoint Iceberg REST di Tabelle Amazon S3
-
Catalogo di Tabelle Amazon S3 per Apache Iceberg
Scegli tra i seguenti metodi di accesso per visualizzare le istruzioni e gli esempi di configurazione.
- AWS analytics services integration (Recommended)
-
Come prerequisito per interrogare le tabelle con Spark su AWS Glue utilizzando l’integrazione dei servizi di analisi AWS, è necessario integrare i bucket delle tabelle con i servizi di analisi AWS.
Puoi configurare la connessione al bucket di tabelle tramite una sessione Spark in un processo o con i comandi magic di AWS Glue Studio in una sessione interattiva. Per utilizzare gli esempi seguenti, sostituisci i valori segnaposto con le informazioni relative al bucket della tabella.
- Utilizzo di uno script PySpark
-
Usa il seguente frammento di codice in uno script PySpark per configurare un processo di AWS Glue e connetterti al bucket della tabella tramite l’integrazione.
spark = SparkSession.builder.appName("SparkIcebergSQL") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog","s3tables") \
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \
.config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/warehouse/") \
.getOrCreate()
- Utilizzo di una sessione AWS Glue interattiva
-
Se utilizzi una sessione notebook interattiva con AWS Glue 5.0, specifica le stesse configurazioni usando il comando magic %%configure in una cella prima dell’esecuzione del codice.
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tables --conf spark.sql.catalog.s3tables=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tables.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.s3tables.glue.id=111122223333:s3tablescatalog/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3tables.warehouse=s3://amzn-s3-demo-table-bucket/warehouse/"}
- Amazon S3 Tables Iceberg REST endpoint
-
Puoi configurare la connessione al bucket di tabelle tramite una sessione Spark in un processo o con i comandi magic di AWS Glue Studio in una sessione interattiva. Per utilizzare gli esempi seguenti, sostituisci i valori segnaposto con le informazioni relative al bucket della tabella.
- Utilizzo di uno script PySpark
Usa il seguente frammento di codice in uno script PySpark per configurare un processo di AWS Glue e connetterti al bucket della tabella tramite l’endpoint.
spark = SparkSession.builder.appName("glue-s3-tables-rest") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3_rest_catalog") \
.config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3_rest_catalog.type", "rest") \
.config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \
.config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \
.config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \
.config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \
.config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \
.getOrCreate()
- Utilizzo di una sessione AWS Glue interattiva
Se utilizzi una sessione notebook interattiva con AWS Glue 5.0, specifica le stesse configurazioni usando il comando magic %%configure in una cella prima dell’esecuzione del codice. Sostituisci i valori segnaposto con le informazioni relative al bucket della tabella.
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3_rest_catalog --conf spark.sql.catalog.s3_rest_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3_rest_catalog.type=rest --conf spark.sql.catalog.s3_rest_catalog.uri=https://s3tables.Region.amazonaws.com/iceberg --conf spark.sql.catalog.s3_rest_catalog.warehouse=arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled=true --conf spark.sql.catalog.s3_rest_catalog.rest.signing-name=s3tables --conf spark.sql.catalog.s3_rest_catalog.rest.signing-region=Region --conf spark.sql.catalog.s3_rest_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled=false"}
- Amazon S3 Tables Catalog for Apache Iceberg
-
Come prerequisito per la connessione alle tabelle utilizzando il catalogo di Tabelle Amazon S3 per Apache Iceberg, devi innanzitutto scaricare il file jar del catalogo più recente e caricarlo in un bucket S3. Quindi, quando crei il processo, devi aggiungere il percorso al file JAR del catalogo client come parametro speciale. Per ulteriori informazioni sui parametri dei processi in AWS Glue, consulta Parametri speciali utilizzati nei processi di AWS Glue nella Guida per gli sviluppatori di AWS Glue.
Puoi configurare la connessione al bucket di tabelle tramite una sessione Spark in un processo o con i comandi magic di AWS Glue Studio in una sessione interattiva. Per utilizzare gli esempi seguenti, sostituisci i valori segnaposto con le informazioni relative al bucket della tabella.
- Utilizzo di uno script PySpark
-
Usa il seguente frammento di codice in uno script PySpark per configurare un processo di AWS Glue e connetterti al bucket della tabella tramite il file JAR. Sostituisci i valori segnaposto con le informazioni relative al bucket della tabella.
spark = SparkSession.builder.appName("glue-s3-tables") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3tablesbucket") \
.config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \
.getOrCreate()
- Utilizzo di una sessione AWS Glue interattiva
-
Se utilizzi una sessione notebook interattiva con AWS Glue 5.0, specifica le stesse configurazioni usando il comando magic %%configure in una cella prima dell’esecuzione del codice. Sostituisci i valori segnaposto con le informazioni relative al bucket della tabella.
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tablesbucket --conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog --conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket", "extra-jars": "s3://amzn-s3-demo-bucket/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"}
Script di esempio
I seguenti script PySpark di esempio possono essere utilizzati per testare l’interrogazione delle tabelle S3 con un processo di AWS Glue. Questi script si connettono al bucket della tabella ed eseguono query per creare un nuovo namespace, creare una tabella di esempio, inserire dati nella tabella e restituire i dati della tabella. Per utilizzare gli script, sostituisci i valori segnaposto con le informazioni relative al bucket della tabella.
Scegli tra i seguenti script in base al tuo metodo di accesso a Tabelle S3.
- S3 Tables integration with AWS analytics services
-
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkIcebergSQL") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog","s3tables")
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \
.config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/bucket/amzn-s3-demo-table-bucket") \
.getOrCreate()
namespace = "new_namespace"
table = "new_table"
spark.sql("SHOW DATABASES").show()
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
- Amazon S3 Tables Iceberg REST endpoint
-
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("glue-s3-tables-rest") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3_rest_catalog") \
.config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3_rest_catalog.type", "rest") \
.config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \
.config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \
.config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \
.config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \
.config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \
.getOrCreate()
namespace = "s3_tables_rest_namespace"
table = "new_table_s3_rest"
spark.sql("SHOW DATABASES").show()
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
- Amazon S3 Tables Catalog for Apache Iceberg
-
from pyspark.sql import SparkSession
#Spark session configurations
spark = SparkSession.builder.appName("glue-s3-tables") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3tablesbucket") \
.config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \
.getOrCreate()
#Script
namespace = "new_namespace"
table = "new_table"
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}")
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Fase 3: creazione di un processo di AWS Glue per l’interrogazione delle tabelle
Le seguenti procedure mostrano come configurare i processi di AWS Glue che si connettono ai bucket delle tabelle S3. È possibile effettuare questa operazione utilizzando la AWS CLI o la console con l’editor di script di AWS Glue Studio. Per ulteriori informazioni, consulta Creazione di processi in AWS Glue nella Guida per l’utente di AWS Glue.
La procedura seguente mostra come utilizzare l’editor di script di AWS Glue Studio per creare un processo ETL che interroga le tabelle S3.
Apri la console AWS Glue all'indirizzo https://console.aws.amazon.com/glue/.
-
Nel riquadro di navigazione, seleziona Processi ETL.
-
Seleziona Editor di script, quindi scegli Carica script e carica lo script PySpark che hai creato per interrogare le tabelle S3.
-
Seleziona la scheda Dettagli del processo e inserisci quanto segue per Proprietà di base.
-
In Nome, inserisci un nome per il processo.
-
In Ruolo IAM, seleziona il ruolo creato per AWS Glue.
-
(Facoltativo) Se utilizzi il catalogo di Tabelle Amazon S3 per Apache Iceberg come metodo di accesso, espandi Proprietà avanzate e per il percorso dei file JAR dipendenti inserisci l’URI S3 del file jar del catalogo client che hai caricato in un bucket S3 come prerequisito. Ad es. s3://amzn-s3-demo-bucket1/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar
-
Seleziona Salva per creare il processo.
-
Seleziona Esegui per avviare il processo e verifica lo stato del processo stesso nella scheda Esecuzioni.
La procedura seguente mostra come utilizzare la AWS CLI per creare un processo ETL che interroga le tabelle S3. Per utilizzare i comandi, sostituisci i valori segnaposto con i tuoi valori.
-
Creare un processo AWS Glue.
aws glue create-job \
--name etl-tables-job \
--role arn:aws:iam::111122223333:role/AWSGlueServiceRole \
--command '{
"Name": "glueetl",
"ScriptLocation": "s3://amzn-s3-demo-bucket1/scripts/glue-etl-query.py",
"PythonVersion": "3"
}' \
--default-arguments '{
"--job-language": "python",
"--class": "GlueApp"
}' \
--glue-version "5.0"
(Facoltativo) Se utilizzi il catalogo di Tabelle Amazon S3 per Apache Iceberg come metodo di accesso, aggiungi il file JAR del catalogo client a --default-arguments utilizzando il parametro --extra-jars. Sostituisci i segnaposto di input con i tuoi quando aggiungi il parametro.
"--extra-jars": "s3://amzn-s3-demo-bucket/jar-path/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"
-
Avvia il processo.
aws glue start-job-run \
--job-name etl-tables-job
-
Per verificare lo stato del processo, copia l’ID di esecuzione dal comando precedente e inseriscilo nel comando seguente.
aws glue get-job-run --job-name etl-tables-job \
--run-id jr_ec9a8a302e71f8483060f87b6c309601ea9ee9c1ffc2db56706dfcceb3d0e1ad