

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Connexion à DynamoDB avec Amazon EMR Serverless
<a name="using-ddb-connector"></a>

Dans ce didacticiel, vous chargez un sous-ensemble de données du [Bureau des noms géographiques des États-Unis d'Amérique](https://www.usgs.gov/us-board-on-geographic-names) dans un compartiment Amazon S3, puis vous utilisez Hive ou Spark sur Amazon EMR Serverless pour copier les données dans une table Amazon DynamoDB à des fins d'interrogation. 

## Étape 1 : télécharger des données dans un compartiment Amazon S3
<a name="using-ddb-connector-s3"></a>

Pour créer un compartiment Amazon S3, suivez les instructions de la section [Création d'un compartiment](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/create-bucket.html) dans le *guide de l'utilisateur de la console Amazon Simple Storage Service*. Remplacez les références `amzn-s3-demo-bucket` à par le nom du bucket que vous venez de créer. Votre application EMR Serverless est maintenant prête à exécuter des tâches.

1. Téléchargez l'exemple d'archive de données `features.zip` à l'aide de la commande suivante.

   ```
   wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
   ```

1. Extrayez le `features.txt` fichier de l'archive et accédez aux premières lignes du fichier :

   ```
   unzip features.zip
   head features.txt
   ```

   Le résultat devrait ressembler à ce qui suit.

   ```
   1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794
   875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7
   1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10
   26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681
   1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605
   1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558
   1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024
   533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0
   829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671
   541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98
   ```

   Les champs de chaque ligne indiquent un identifiant unique, un nom, un type d'élément naturel, un état, une latitude en degrés, une longitude en degrés et une hauteur en pieds.

1. Chargez vos données sur Amazon S3

   ```
   aws s3 cp features.txt s3://amzn-s3-demo-bucket/features/
   ```

## Étape 2 : Création d'une table Hive
<a name="using-ddb-connector-create-table"></a>

Utilisez Apache Spark ou Hive pour créer une nouvelle table Hive contenant les données téléchargées dans Amazon S3.

------
#### [ Spark ]

Pour créer une table Hive avec Spark, exécutez la commande suivante.

```
import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()

sparkSession.sql("CREATE TABLE hive_features \
    (feature_id BIGINT, \
    feature_name STRING, \
    feature_class STRING, \
    state_alpha STRING, \
    prim_lat_dec DOUBLE, \
    prim_long_dec DOUBLE, \
    elev_in_ft BIGINT) \
    ROW FORMAT DELIMITED \
    FIELDS TERMINATED BY '|' \
    LINES TERMINATED BY '\n' \
    LOCATION 's3://amzn-s3-demo-bucket/features';")
```

Vous disposez désormais d'une table Hive remplie avec les données du `features.txt` fichier. Pour vérifier que vos données figurent dans la table, exécutez une requête SQL Spark comme indiqué dans l'exemple suivant.

```
sparkSession.sql(
    "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")
```

------
#### [ Hive ]

Pour créer une table Hive avec Hive, exécutez la commande suivante.

```
CREATE TABLE hive_features
    (feature_id             BIGINT,
    feature_name            STRING ,
    feature_class           STRING ,
    state_alpha             STRING,
    prim_lat_dec            DOUBLE ,
    prim_long_dec           DOUBLE ,
    elev_in_ft              BIGINT)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '|'
    LINES TERMINATED BY '\n'
    LOCATION 's3://amzn-s3-demo-bucket/features';
```

Vous disposez désormais d'une table Hive qui contient les données du `features.txt` fichier. Pour vérifier que vos données figurent dans la table, exécutez une requête HiveQL, comme indiqué dans l'exemple suivant.

```
SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;
```

------

## Étape 3 : Copier les données dans DynamoDB
<a name="using-ddb-connector-copy"></a>

Utilisez Spark ou Hive pour copier des données dans une nouvelle table DynamoDB.

------
#### [ Spark ]

Pour copier les données de la table Hive que vous avez créée à l'étape précédente dans DynamoDB, **suivez les étapes** 1 à 3 de la section [Copier](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.CopyDataToDDB.html) les données dans DynamoDB. Cela crée une nouvelle table DynamoDB appelée. `Features` Vous pouvez ensuite lire les données directement depuis le fichier texte et les copier dans votre table DynamoDB, comme le montre l'exemple suivant.

```
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkContext

import scala.collection.JavaConverters._

object EmrServerlessDynamoDbTest {

    def main(args: Array[String]): Unit = {
    
        jobConf.set("dynamodb.input.tableName", "Features")
        jobConf.set("dynamodb.output.tableName", "Features")
        jobConf.set("dynamodb.region", "region")

        jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
        jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
    
        val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/")
            .map(row => {
                val line = row.split("\\|")
                val item = new DynamoDBItemWritable()
                
                val elevInFt = if (line.length > 6) {
                    new AttributeValue().withN(line(6))
                } else {
                    new AttributeValue().withNULL(true)
                }
                
                item.setItem(Map(
                    "feature_id" -> new AttributeValue().withN(line(0)), 
                    "feature_name" -> new AttributeValue(line(1)), 
                    "feature_class" -> new AttributeValue(line(2)), 
                    "state_alpha" -> new AttributeValue(line(3)), 
                    "prim_lat_dec" -> new AttributeValue().withN(line(4)), 
                    "prim_long_dec" -> new AttributeValue().withN(line(5)),
                    "elev_in_ft" -> elevInFt)
                    .asJava)
                (new Text(""), item)
        })
        rdd.saveAsHadoopDataset(jobConf)
    }
}
```

------
#### [ Hive ]

Pour copier les données de la table Hive que vous avez créée à l'étape précédente dans DynamoDB, suivez les instructions de la section [Copier](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.CopyDataToDDB.html) les données dans DynamoDB.

------

## Étape 4 : demander des données à partir de DynamoDB
<a name="using-ddb-connector-query"></a>

Utilisez Spark ou Hive pour interroger votre table DynamoDB.

------
#### [ Spark ]

Pour interroger les données de la table DynamoDB que vous avez créée à l'étape précédente, utilisez Spark SQL ou l'API Spark. MapReduce 

**Example — Interrogez votre table DynamoDB avec Spark SQL**  
La requête SQL Spark suivante renvoie une liste de tous les types de fonctionnalités par ordre alphabétique.  

```
val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \
    FROM ddb_features \
    ORDER BY feature_class;")
```
La requête SQL Spark suivante renvoie une liste de tous les lacs commençant par la lettre *M.*  

```
val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \
    FROM ddb_features \
    WHERE feature_class = 'Lake' \
    AND feature_name LIKE 'M%' \
    ORDER BY feature_name;")
```
La requête SQL Spark suivante renvoie une liste de tous les états comportant au moins trois entités situées à plus d'un kilomètre.  

```
val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \
    FROM ddb_features \
    WHERE elev_in_ft > 5280 \
    GROUP by state_alpha, feature_class \
    HAVING COUNT(*) >= 3 \
    ORDER BY state_alpha, feature_class;")
```

**Example — Interrogez votre table DynamoDB avec l'API Spark MapReduce**  
La MapReduce requête suivante renvoie une liste de tous les types de fonctionnalités par ordre alphabétique.  

```
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    .map(pair => (pair._1, pair._2.getItem))
    .map(pair => pair._2.get("feature_class").getS)
    .distinct()
    .sortBy(value => value)
    .toDF("feature_class")
```
La MapReduce requête suivante renvoie une liste de tous les lacs commençant par la lettre *M.*  

```
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    .map(pair => (pair._1, pair._2.getItem))
    .filter(pair => "Lake".equals(pair._2.get("feature_class").getS))
    .filter(pair => pair._2.get("feature_name").getS.startsWith("M"))
    .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS))
    .sortBy(_._1)
    .toDF("feature_name", "state_alpha")
```
La MapReduce requête suivante renvoie une liste de tous les états comportant au moins trois entités situées à plus d'un kilomètre.  

```
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
    .map(pair => pair._2.getItem)
    .filter(pair => pair.get("elev_in_ft").getN != null)
    .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280)
    .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS))
    .filter(pair => pair._2.size >= 3)
    .map(pair => (pair._1._1, pair._1._2, pair._2.size))
    .sortBy(pair => (pair._1, pair._2))
    .toDF("state_alpha", "feature_class", "count")
```

------
#### [ Hive ]

Pour interroger les données de la table DynamoDB que vous avez créée à l'étape précédente, suivez les instructions de la section [Interroger les données de la](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.Tutorial.QueryDataInDynamoDB.html) table DynamoDB.

------