Acheminer les enregistrements entrants vers différentes tables Iceberg - Amazon Data Firehose

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Acheminer les enregistrements entrants vers différentes tables Iceberg

Amazon Data Firehose peut acheminer les enregistrements entrants d'un flux vers différentes tables Iceberg en fonction du contenu de l'enregistrement. Les enregistrements ne sont pas conservés en ordre lorsqu'ils sont livrés par Amazon Data Firehose. Examinez l'exemple d'enregistrement d'entrée suivant.

{ "deviceId": "Device1234", "timestamp": "2024-11-28T11:30:00Z", "data": { "temperature": 21.5, "location": { "latitude": 37.3324, "longitude": -122.0311 } }, "powerlevel": 84, "status": "online" }
{ "deviceId": "Device4567", "timestamp": "2023-11-28T10:40:00Z", "data": { "pressure": 1012.4, "location": { "zipcode": 24567 } }, "powerlevel": 82, "status": "online" }

Dans cet exemple, le deviceId champ possède deux valeurs possibles : Device1234 etDevice4567. Lorsqu'un enregistrement entrant possède un deviceId champ commeDevice1234, nous voulons écrire l'enregistrement dans une table Iceberg nomméeDevice1234, et lorsqu'un enregistrement entrant a un deviceId champ commeDevice4567, nous voulons écrire l'enregistrement dans une table nomméeDevice4567.

Notez que les enregistrements contenant Device1234 et Device4567 peuvent avoir un ensemble de champs différent qui correspondent à différentes colonnes de la table Iceberg correspondante. Les enregistrements entrants peuvent avoir une structure JSON imbriquée dans laquelle ils deviceIdpeuvent être imbriqués dans l'enregistrement JSON. Dans les sections à venir, nous verrons comment acheminer les enregistrements vers différentes tables en fournissant les informations de routage appropriées à Firehose dans de tels scénarios.

Fournir des informations de routage à Firehose avec expression JSONQuery

Le moyen le plus simple et le plus rentable de fournir des informations de routage d'enregistrements à Firehose consiste à fournir une JSONQuery expression. Avec cette approche, vous fournissez des JSONQuery expressions pour trois paramètres : Database NameTable Name, et (éventuellement)Operation. Firehose utilise l'expression que vous fournissez pour extraire des informations de vos enregistrements de flux entrants afin de les acheminer.

Le Database Name paramètre indique le nom de la base de données de destination. Le Table Name paramètre indique le nom de la table de destination. Operationest un paramètre facultatif qui indique s'il faut insérer l'enregistrement du flux entrant en tant que nouvel enregistrement dans la table de destination, ou s'il faut modifier ou supprimer un enregistrement existant dans la table de destination. Le champ Opération doit avoir l'une des valeurs suivantes — insertupdate, oudelete.

Pour chacun de ces trois paramètres, vous pouvez fournir une valeur statique ou une expression dynamique dans laquelle la valeur est extraite de l'enregistrement entrant. Par exemple, si vous souhaitez transmettre tous les enregistrements de flux entrants à une seule base de données nomméeIoTevents, le nom de la base de données aura une valeur statique de“IoTevents”. Si le nom de la table de destination doit être obtenu à partir d'un champ de l'enregistrement entrant, le nom de la table est une expression dynamique qui indique le champ de l'enregistrement entrant à partir duquel le nom de la table de destination doit être extrait.

Dans l'exemple suivant, nous utilisons une valeur statique pour le nom de la base de données, une valeur dynamique pour le nom de la table et une valeur statique pour le fonctionnement. Notez que la spécification de l'opération est facultative. Si aucune opération n'est spécifiée, Firehose insère les enregistrements entrants dans la table de destination en tant que nouveaux enregistrements par défaut.

Database Name : "IoTevents" Table Name : .deviceId Operation : "insert"

Si le deviceId champ est imbriqué dans l'enregistrement JSON, nous indiquons le nom de la table avec les informations du champ imbriqué sous la forme. .event.deviceId

Note
  • Lorsque vous spécifiez l'opération sous la forme update oudelete, vous devez soit spécifier des clés uniques pour la table de destination lorsque vous configurez votre stream Firehose, soit définir identifier-field-idsdans Iceberg lorsque vous exécutez des opérations de création de table ou de modification de table dans Iceberg. Si vous ne le spécifiez pas, Firehose génère une erreur et envoie les données à un compartiment d'erreur S3.

  • Les Table Name valeurs Database Name et doivent correspondre exactement aux noms de votre base de données et de table de destination. S'ils ne correspondent pas, Firehose génère une erreur et envoie les données à un compartiment d'erreur S3.

Fournir des informations de routage à l'aide d'une AWS Lambda fonction

Il peut arriver que vous disposiez de règles complexes qui déterminent comment acheminer les enregistrements entrants vers une table de destination. Par exemple, vous pouvez avoir une règle qui définit si un champ contient la valeur A, B ou F, qui doit être acheminée vers une table de destination nommée TableX ou vous pouvez augmenter l'enregistrement du flux entrant en ajoutant des attributs supplémentaires. Par exemple, si un enregistrement contient un champ device_id égal à 1, vous pouvez ajouter un autre champ intitulé « modem » et écrire le champ supplémentaire dans la colonne de la table de destination. device_type Dans de tels cas, vous pouvez transformer le flux source à l'aide d'une AWS Lambda fonction dans Firehose et fournir des informations de routage dans le cadre de la sortie de la fonction de transformation Lambda. Pour comprendre comment transformer le flux source à l'aide d'une AWS Lambda fonction dans Firehose, consultez Transformer les données source dans Amazon Data Firehose.

Lorsque vous utilisez Lambda pour la transformation d'un flux source dans Firehose, la sortie doit contenir des paramètres recordIdresult, et/ou. data KafkaRecordValue Le paramètre recordId contient l'enregistrement du flux d'entrée, result indique si la transformation a réussi et data contient la sortie transformée codée en Base64 de votre fonction Lambda. Pour de plus amples informations, veuillez consulter Paramètres requis pour la transformation des données.

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1" }

Pour spécifier des informations de routage à Firehose sur la manière de router l'enregistrement du flux vers une table de destination dans le cadre de votre fonction Lambda, la sortie de votre fonction Lambda doit contenir une section supplémentaire pour. metadata L'exemple suivant montre comment la section des métadonnées est ajoutée à la sortie Lambda pour un flux Firehose qui utilise Kinesis Data Streams comme source de données pour indiquer à Firehose qu'il doit insérer l'enregistrement en tant que nouvel enregistrement dans la table nommée de la base de données. Device1234 IoTevents

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1", "metadata":{ "otfMetadata":{ "destinationTableName":"Device1234", "destinationDatabaseName":"IoTevents", "operation":"insert" } } }

De même, l'exemple suivant montre comment ajouter la section de métadonnées à la sortie Lambda pour un Firehose qui utilise Amazon Managed Streaming pour Apache Kafka comme source de données pour indiquer à Firehose qu'il doit insérer l'enregistrement en tant que nouvel enregistrement dans une table nommée dans la base de données. Device1234 IoTevents

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "kafkaRecordValue": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1", "metadata":{ "otfMetadata":{ "destinationTableName":"Device1234", "destinationDatabaseName":"IoTevents", "operation":"insert" } } }

Pour cet exemple,

  • destinationDatabaseNamefait référence au nom de la base de données cible et constitue un champ obligatoire.

  • destinationTableNamefait référence au nom de la table cible et constitue un champ obligatoire.

  • operationest un champ facultatif avec des valeurs possibles telles que insertupdate, etdelete. Si vous ne spécifiez aucune valeur, l'opération par défaut estinsert.

Note
  • Lorsque vous spécifiez l'opération sous la forme update oudelete, vous devez soit spécifier des clés uniques pour la table de destination lorsque vous configurez votre stream Firehose, soit définir identifier-field-idsdans Iceberg lorsque vous exécutez des opérations de création de table ou de modification de table dans Iceberg. Si vous ne le spécifiez pas, Firehose génère une erreur et envoie les données à un compartiment d'erreur S3.

  • Les Table Name valeurs Database Name et doivent correspondre exactement aux noms de votre base de données et de table de destination. S'ils ne correspondent pas, Firehose génère une erreur et envoie les données à un compartiment d'erreur S3.

  • Lorsque votre flux Firehose possède à la fois une fonction de transformation Lambda et une expression JSONQuery , Firehose vérifie d'abord la présence du champ de métadonnées dans la sortie Lambda afin de déterminer comment acheminer l'enregistrement vers la table de destination appropriée, puis examine la sortie de votre expression pour détecter les champs manquants. JSONQuery

    Si le Lambda ou l' JSONQuery expression ne fournit pas les informations de routage requises, Firehose suppose qu'il s'agit d'un scénario à table unique et recherche les informations d'une seule table dans la configuration des clés uniques.

    Pour plus d'informations, consultez la table Router les enregistrements entrants vers une seule table Iceberg. Si Firehose ne parvient pas à déterminer les informations de routage et à faire correspondre l'enregistrement à une table de destination spécifiée, il fournit les données au compartiment d'erreur S3 que vous avez spécifié.

Exemple de fonction Lambda

Cette fonction Lambda est un exemple de code Python qui analyse les enregistrements de flux entrants et ajoute des champs obligatoires pour spécifier la manière dont les données doivent être écrites dans des tables spécifiques. Vous pouvez utiliser cet exemple de code pour ajouter la section des métadonnées pour les informations de routage.

import json import base64 def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']) firehose_records_output = {} firehose_records_output['records'] = [] for firehose_record_input in firehose_records_input['records']: # Get payload from Lambda input, it could be different with different sources if 'kafkaRecordValue' in firehose_record_input: payload_bytes = base64.b64decode(firehose_record_input['kafkaRecordValue']).decode('utf-8') else payload_bytes = base64.b64decode(firehose_record_input['data']).decode('utf-8') # perform data processing on customer payload bytes here # Create output with proper record ID, output data (may be different with different sources), result, and metadata firehose_record_output = {} if 'kafkaRecordValue' in firehose_record_input: firehose_record_output['kafkaRecordValue'] = base64.b64encode(payload_bytes.encode('utf-8')) else firehose_record_output['data'] = base64.b64encode(payload_bytes.encode('utf-8')) firehose_record_output['recordId'] = firehose_record_input['recordId'] firehose_record_output['result'] = 'Ok' firehose_record_output['metadata'] = { 'otfMetadata': { 'destinationDatabaseName': 'your_destination_database', 'destinationTableName': 'your_destination_table', 'operation': 'insert' } } firehose_records_output['records'].append(firehose_record_output) return firehose_records_output