

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Acheminer les enregistrements entrants vers différentes tables Iceberg
<a name="apache-iceberg-format-input-record-different"></a>

Amazon Data Firehose peut acheminer les enregistrements entrants d'un flux vers différentes tables Iceberg en fonction du contenu de l'enregistrement. Les enregistrements ne sont pas conservés en ordre lorsqu'ils sont livrés par Amazon Data Firehose. Examinez l'exemple d'enregistrement d'entrée suivant. 

```
{
  "deviceId": "Device1234",
  "timestamp": "2024-11-28T11:30:00Z",
  "data": {
    "temperature": 21.5,
    "location": {
      "latitude": 37.3324,
      "longitude": -122.0311
    }
  },
  "powerlevel": 84,
  "status": "online"
}
```

```
{
  "deviceId": "Device4567",
  "timestamp": "2023-11-28T10:40:00Z",
  "data": {
    "pressure": 1012.4,
    "location": {
      "zipcode": 24567
    }
  },
  "powerlevel": 82,
  "status": "online"
}
```

Dans cet exemple, le `deviceId` champ possède deux valeurs possibles : `Device1234` et`Device4567`. Lorsqu'un enregistrement entrant possède un `deviceId` champ comme`Device1234`, nous voulons écrire l'enregistrement dans une table Iceberg nommée`Device1234`, et lorsqu'un enregistrement entrant a un `deviceId` champ comme`Device4567`, nous voulons écrire l'enregistrement dans une table nommée`Device4567`. 

Notez que les enregistrements contenant `Device1234` et `Device4567` peuvent avoir un ensemble de champs différent qui correspondent à différentes colonnes de la table Iceberg correspondante. Les enregistrements entrants peuvent avoir une structure JSON imbriquée dans laquelle ils **`deviceId`**peuvent être imbriqués dans l'enregistrement JSON. Dans les sections à venir, nous verrons comment acheminer les enregistrements vers différentes tables en fournissant les informations de routage appropriées à Firehose dans de tels scénarios. 

## Fournir des informations de routage à Firehose avec expression JSONQuery
<a name="apache-iceberg-route-jq"></a>

Le moyen le plus simple et le plus rentable de fournir des informations de routage d'enregistrements à Firehose consiste à fournir une JSONQuery expression. Avec cette approche, vous fournissez des JSONQuery expressions pour trois paramètres : `Database Name``Table Name`, et (éventuellement)`Operation`. Firehose utilise l'expression que vous fournissez pour extraire des informations de vos enregistrements de flux entrants afin de les acheminer. 

Le `Database Name` paramètre indique le nom de la base de données de destination. Le `Table Name` paramètre indique le nom de la table de destination. `Operation`est un paramètre facultatif qui indique s'il faut insérer l'enregistrement du flux entrant en tant que nouvel enregistrement dans la table de destination, ou s'il faut modifier ou supprimer un enregistrement existant dans la table de destination. Le champ Opération doit avoir l'une des valeurs suivantes — `insert``update`, ou`delete`. 

Pour chacun de ces trois paramètres, vous pouvez fournir une valeur statique ou une expression dynamique dans laquelle la valeur est extraite de l'enregistrement entrant. Par exemple, si vous souhaitez transmettre tous les enregistrements de flux entrants à une seule base de données nommée`IoTevents`, le nom de la base de données aura une valeur statique de`“IoTevents”`. Si le nom de la table de destination doit être obtenu à partir d'un champ de l'enregistrement entrant, le nom de la table est une expression dynamique qui indique le champ de l'enregistrement entrant à partir duquel le nom de la table de destination doit être extrait. 

Dans l'exemple suivant, nous utilisons une valeur statique pour le nom de la base de données, une valeur dynamique pour le nom de la table et une valeur statique pour le fonctionnement. Notez que la spécification de l'opération est facultative. Si aucune opération n'est spécifiée, Firehose insère les enregistrements entrants dans la table de destination en tant que nouveaux enregistrements par défaut. 

```
Database Name : "IoTevents"
Table Name : .deviceId
Operation : "insert"
```

Si le `deviceId` champ est imbriqué dans l'enregistrement JSON, nous indiquons le nom de la table avec les informations du champ imbriqué sous la forme. `.event.deviceId` 

**Note**  
Lorsque vous spécifiez l'opération sous la forme `update` ou`delete`, vous devez soit spécifier des clés uniques pour la table de destination lorsque vous configurez votre stream Firehose, soit définir [identifier-field-ids](https://iceberg.apache.org/spec/#identifier-field-ids)dans Iceberg lorsque vous exécutez des opérations de [création de table ou de [modification](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#alter-table-set-identifier-fields) de table](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#create-table) dans Iceberg. Si vous ne le spécifiez pas, Firehose génère une erreur et envoie les données à un compartiment d'erreur S3.
Les `Table Name` valeurs `Database Name` et doivent correspondre exactement aux noms de votre base de données et de table de destination. S'ils ne correspondent pas, Firehose génère une erreur et envoie les données à un compartiment d'erreur S3.

## Fournir des informations de routage à l'aide d'une AWS Lambda fonction
<a name="apache-iceberg-route-lambda"></a>

Il peut arriver que vous disposiez de règles complexes qui déterminent comment acheminer les enregistrements entrants vers une table de destination. Par exemple, vous pouvez avoir une règle qui définit si un champ contient la valeur A, B ou F, qui doit être acheminée vers une table de destination nommée `TableX` ou vous pouvez augmenter l'enregistrement du flux entrant en ajoutant des attributs supplémentaires. Par exemple, si un enregistrement contient un champ `device_id` égal à 1, vous pouvez ajouter un autre champ intitulé « modem » et écrire le champ supplémentaire dans la colonne de la table de destination. `device_type` Dans de tels cas, vous pouvez transformer le flux source à l'aide d'une AWS Lambda fonction dans Firehose et fournir des informations de routage dans le cadre de la sortie de la fonction de transformation Lambda. Pour comprendre comment transformer le flux source à l'aide d'une AWS Lambda fonction dans Firehose, consultez [Transformer les données source dans Amazon Data Firehose](data-transformation.md).

Lorsque vous utilisez Lambda pour la transformation d'un flux source dans Firehose, la sortie doit contenir des paramètres `recordId``result`, et/ou. `data` `KafkaRecordValue` Le paramètre `recordId` contient l'enregistrement du flux d'entrée, `result` indique si la transformation a réussi et `data` contient la sortie transformée codée en Base64 de votre fonction Lambda. Pour de plus amples informations, veuillez consulter [Paramètres requis pour la transformation des données](data-transformation-status-model.md).

```
{
  "recordId": "49655962066601463032522589543535113056108699331451682818000000",
  "result": "Ok",
  "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1"
}
```

Pour spécifier des informations de routage à Firehose sur la manière de router l'enregistrement du flux vers une table de destination dans le cadre de votre fonction Lambda, la sortie de votre fonction Lambda doit contenir une section supplémentaire pour. `metadata` L'exemple suivant montre comment la section des métadonnées est ajoutée à la sortie Lambda pour un flux Firehose qui utilise Kinesis Data Streams comme source de données pour indiquer à Firehose qu'il doit insérer l'enregistrement en tant que nouvel enregistrement dans la table nommée de la base de données. `Device1234` `IoTevents`

```
 {
"recordId": "49655962066601463032522589543535113056108699331451682818000000",
    "result": "Ok",
    "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1",
    
    "metadata":{
"otfMetadata":{
            "destinationTableName":"Device1234",
            "destinationDatabaseName":"IoTevents",
            "operation":"insert"
        }
    }
  }
```

De même, l'exemple suivant montre comment ajouter la section de métadonnées à la sortie Lambda pour un Firehose qui utilise Amazon Managed Streaming pour Apache Kafka comme source de données pour indiquer à Firehose qu'il doit insérer l'enregistrement en tant que nouvel enregistrement dans une table nommée dans la base de données. `Device1234` `IoTevents`

```
{
"recordId": "49655962066601463032522589543535113056108699331451682818000000",
    "result": "Ok",
    "kafkaRecordValue": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1",
    
    "metadata":{
"otfMetadata":{
            "destinationTableName":"Device1234",
            "destinationDatabaseName":"IoTevents",
            "operation":"insert"
        }
    }
  }
```

Pour cet exemple, 
+ `destinationDatabaseName`fait référence au nom de la base de données cible et constitue un champ obligatoire.
+ `destinationTableName`fait référence au nom de la table cible et constitue un champ obligatoire.
+ `operation`est un champ facultatif avec des valeurs possibles telles que `insert``update`, et`delete`. Si vous ne spécifiez aucune valeur, l'opération par défaut est`insert`.

**Note**  
Lorsque vous spécifiez l'opération sous la forme `update` ou`delete`, vous devez soit spécifier des clés uniques pour la table de destination lorsque vous configurez votre stream Firehose, soit définir [identifier-field-ids](https://iceberg.apache.org/spec/#identifier-field-ids)dans Iceberg lorsque vous exécutez des opérations de [création de table ou de [modification](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#alter-table-set-identifier-fields) de table](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#create-table) dans Iceberg. Si vous ne le spécifiez pas, Firehose génère une erreur et envoie les données à un compartiment d'erreur S3.
Les `Table Name` valeurs `Database Name` et doivent correspondre exactement aux noms de votre base de données et de table de destination. S'ils ne correspondent pas, Firehose génère une erreur et envoie les données à un compartiment d'erreur S3.
Lorsque votre flux Firehose possède à la fois une fonction de transformation Lambda et une expression JSONQuery , Firehose vérifie d'abord la présence du champ de métadonnées dans la sortie Lambda afin de déterminer comment acheminer l'enregistrement vers la table de destination appropriée, puis examine la sortie de votre expression pour détecter les champs manquants. JSONQuery   
Si le Lambda ou l' JSONQuery expression ne fournit pas les informations de routage requises, Firehose suppose qu'il s'agit d'un scénario à table unique et recherche les informations d'une seule table dans la configuration des clés uniques.   
Pour plus d'informations, consultez la table [Router les enregistrements entrants vers une seule table Iceberg.](apache-iceberg-format-input-record.md) Si Firehose ne parvient pas à déterminer les informations de routage et à faire correspondre l'enregistrement à une table de destination spécifiée, il fournit les données au compartiment d'erreur S3 que vous avez spécifié. 

### Exemple de fonction Lambda
<a name="format-record-lambda"></a>

Cette fonction Lambda est un exemple de code Python qui analyse les enregistrements de flux entrants et ajoute des champs obligatoires pour spécifier la manière dont les données doivent être écrites dans des tables spécifiques. Vous pouvez utiliser cet exemple de code pour ajouter la section des métadonnées pour les informations de routage.

```
import json
import base64
 
 
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'])
 
    firehose_records_output = {}
    firehose_records_output['records'] = []
 
 
    for firehose_record_input in firehose_records_input['records']:
 
        # Get payload from Lambda input, it could be different with different sources
        if 'kafkaRecordValue' in firehose_record_input:
            payload_bytes = base64.b64decode(firehose_record_input['kafkaRecordValue']).decode('utf-8')
        else
            payload_bytes = base64.b64decode(firehose_record_input['data']).decode('utf-8')
 
        # perform data processing on customer payload bytes here
 
        # Create output with proper record ID, output data (may be different with different sources), result, and metadata
        firehose_record_output = {}
 
        if 'kafkaRecordValue' in firehose_record_input:
            firehose_record_output['kafkaRecordValue'] = base64.b64encode(payload_bytes.encode('utf-8'))
        else
            firehose_record_output['data'] = base64.b64encode(payload_bytes.encode('utf-8'))
        
        firehose_record_output['recordId'] = firehose_record_input['recordId']
        firehose_record_output['result'] =  'Ok'
        firehose_record_output['metadata'] = {
            'otfMetadata': {
                'destinationDatabaseName': 'your_destination_database',
                'destinationTableName': 'your_destination_table',
                'operation': 'insert'
                }
            }
        firehose_records_output['records'].append(firehose_record_output)
    return firehose_records_output
```