Leiten Sie eingehende Datensätze an verschiedene Iceberg-Tabellen weiter - Amazon Data Firehose

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Leiten Sie eingehende Datensätze an verschiedene Iceberg-Tabellen weiter

Amazon Data Firehose kann eingehende Datensätze in einem Stream basierend auf dem Inhalt des Datensatzes an verschiedene Iceberg-Tabellen weiterleiten. Die Aufzeichnungen werden nicht in der richtigen Reihenfolge aufbewahrt, wenn sie von Amazon Data Firehose geliefert werden. Betrachten Sie den folgenden Beispiel-Eingabedatensatz.

{ "deviceId": "Device1234", "timestamp": "2024-11-28T11:30:00Z", "data": { "temperature": 21.5, "location": { "latitude": 37.3324, "longitude": -122.0311 } }, "powerlevel": 84, "status": "online" }
{ "deviceId": "Device4567", "timestamp": "2023-11-28T10:40:00Z", "data": { "pressure": 1012.4, "location": { "zipcode": 24567 } }, "powerlevel": 82, "status": "online" }

In diesem Beispiel hat das deviceId Feld zwei mögliche Werte — Device1234 undDevice4567. Wenn ein eingehender Datensatz das deviceId Feld als enthältDevice1234, möchten wir den Datensatz in eine Eisberg-Tabelle mit dem Namen schreibenDevice1234, und wenn ein eingehender Datensatz das deviceId Feld als enthältDevice4567, möchten wir den Datensatz in eine Tabelle mit dem Namen Device4567 schreiben.

Beachten Sie, dass die Datensätze unterschiedliche Felder enthalten Device1234 und Device4567 möglicherweise unterschiedliche Felder haben, die unterschiedlichen Spalten in der entsprechenden Eisberg-Tabelle zugeordnet sind. Die eingehenden Datensätze haben möglicherweise eine verschachtelte JSON-Struktur, in der sie innerhalb des JSON-Datensatzes verschachtelt werden deviceIdkönnen. In den nächsten Abschnitten besprechen wir, wie Sie Datensätze an verschiedene Tabellen weiterleiten können, indem Sie Firehose in solchen Szenarien die entsprechenden Routing-Informationen zur Verfügung stellen.

Stellen Sie Firehose Routing-Informationen mit JSONQuery Ausdruck zur Verfügung

Die einfachste und kostengünstigste Möglichkeit, Firehose Datensatz-Routing-Informationen zur Verfügung zu stellen, ist die Bereitstellung eines JSONQuery Ausdrucks. Bei diesem Ansatz stellen Sie JSONQuery Ausdrücke für drei Parameter bereit — Database NameTable Name, und (optional)Operation. Firehose verwendet den von Ihnen angegebenen Ausdruck, um Informationen aus Ihren eingehenden Stream-Datensätzen zu extrahieren, um die Datensätze weiterzuleiten.

Der Database Name Parameter gibt den Namen der Zieldatenbank an. Der Table Name Parameter gibt den Namen der Zieltabelle an. Operationist ein optionaler Parameter, der angibt, ob der eingehende Stream-Datensatz als neuer Datensatz in die Zieltabelle eingefügt oder ein vorhandener Datensatz in der Zieltabelle geändert oder gelöscht werden soll. Das Feld Operation muss einen der folgenden Werte haben — insertupdate, oderdelete.

Für jeden dieser drei Parameter können Sie entweder einen statischen Wert oder einen dynamischen Ausdruck angeben, wobei der Wert aus dem eingehenden Datensatz abgerufen wird. Wenn Sie beispielsweise alle eingehenden Stream-Datensätze an eine einzige Datenbank mit dem Namen senden möchtenIoTevents, hätte der Datenbankname den statischen Wert“IoTevents”. Wenn der Name der Zieltabelle aus einem Feld im eingehenden Datensatz abgerufen werden muss, ist der Tabellenname ein dynamischer Ausdruck, der das Feld im eingehenden Datensatz angibt, aus dem der Name der Zieltabelle abgerufen werden muss.

Im folgenden Beispiel verwenden wir einen statischen Wert für den Datenbanknamen, einen dynamischen Wert für den Tabellennamen und einen statischen Wert für den Vorgang. Beachten Sie, dass die Angabe der Operation optional ist. Wenn keine Operation angegeben ist, fügt Firehose die eingehenden Datensätze standardmäßig als neue Datensätze in die Zieltabelle ein.

Database Name : "IoTevents" Table Name : .deviceId Operation : "insert"

Wenn das deviceId Feld innerhalb des JSON-Datensatzes verschachtelt ist, geben wir den Tabellennamen mit den verschachtelten Feldinformationen als an. .event.deviceId

Anmerkung
  • Wenn Sie die Operation als update oder angebendelete, müssen Sie entweder eindeutige Schlüssel für die Zieltabelle angeben, wenn Sie Ihren Firehose-Stream einrichten, oder identifier-field-idsin Iceberg festlegen, wenn Sie Operationen zum Erstellen von Tabellen oder Ändern von Tabellen in Iceberg ausführen. Wenn Sie dies nicht angeben, gibt Firehose einen Fehler aus und übermittelt Daten an einen S3-Fehler-Bucket.

  • Die Table Name Werte Database Name und müssen exakt mit Ihren Zieldatenbank- und Tabellennamen übereinstimmen. Wenn sie nicht übereinstimmen, gibt Firehose einen Fehler aus und übermittelt Daten an einen S3-Fehler-Bucket.

Stellen Sie Routing-Informationen mithilfe einer Funktion bereit AWS Lambda

Es kann Szenarien geben, in denen Sie über komplexe Regeln verfügen, die bestimmen, wie eingehende Datensätze an eine Zieltabelle weitergeleitet werden. Möglicherweise haben Sie eine Regel, die definiert, ob ein Feld den Wert A, B oder F enthält, der an eine Zieltabelle mit dem Namen weitergeleitet werden soll, TableX oder Sie möchten den Datensatz für den Eingangsstream erweitern, indem Sie zusätzliche Attribute hinzufügen. Wenn ein Datensatz beispielsweise ein Feld device_id als 1 enthält, möchten Sie vielleicht ein weiteres Feld mit dem Namen „Modem“ hinzufügen und das zusätzliche Feld in die Spalte der Zieltabelle schreiben. device_type In solchen Fällen können Sie den Quellstream mithilfe einer AWS Lambda Funktion in Firehose transformieren und Routing-Informationen als Teil der Ausgabe der Lambda-Transformationsfunktion bereitstellen. Informationen dazu, wie Sie den Quellstream mithilfe einer AWS Lambda Funktion in Firehose transformieren können, finden Sie unter Transformieren von Quelldaten in Amazon Data Firehose.

Wenn Sie Lambda für die Transformation eines Quellstreams in Firehose verwenden, muss die Ausgabe die Parameter recordIdresult, und data oder KafkaRecordValue enthalten. Der Parameter recordId enthält den Eingabestream-Datensatz, result gibt an, ob die Transformation erfolgreich war, und data enthält die Base64-kodierte transformierte Ausgabe Ihrer Lambda-Funktion. Weitere Informationen finden Sie unter Erforderliche Parameter für die Datentransformation.

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1" }

Um Firehose Routing-Informationen darüber zu geben, wie der Stream-Datensatz als Teil Ihrer Lambda-Funktion an eine Zieltabelle weitergeleitet werden soll, muss die Ausgabe Ihrer Lambda-Funktion einen zusätzlichen Abschnitt für enthalten. metadata Das folgende Beispiel zeigt, wie der Metadatenabschnitt zur Lambda-Ausgabe für einen Firehose-Stream hinzugefügt wird, der Kinesis Data Streams als Datenquelle verwendet, um Firehose anzuweisen, den Datensatz als neuen Datensatz in die Tabelle mit dem Namen der Datenbank einzufügen. Device1234 IoTevents

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1", "metadata":{ "otfMetadata":{ "destinationTableName":"Device1234", "destinationDatabaseName":"IoTevents", "operation":"insert" } } }

In ähnlicher Weise zeigt das folgende Beispiel, wie Sie den Metadatenabschnitt zur Lambda-Ausgabe für eine Firehose hinzufügen können, die Amazon Managed Streaming for Apache Kafka als Datenquelle verwendet, um Firehose anzuweisen, den Datensatz als neuen Datensatz in eine in der Datenbank benannte Tabelle einzufügen. Device1234 IoTevents

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "kafkaRecordValue": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1", "metadata":{ "otfMetadata":{ "destinationTableName":"Device1234", "destinationDatabaseName":"IoTevents", "operation":"insert" } } }

Für dieses Beispiel

  • destinationDatabaseNamebezieht sich auf den Namen der Zieldatenbank und ist ein Pflichtfeld.

  • destinationTableNamebezieht sich auf den Namen der Zieltabelle und ist ein Pflichtfeld.

  • operationist ein optionales Feld mit möglichen Werten wie insertupdate, unddelete. Wenn Sie keine Werte angeben, ist die Standardoperationinsert.

Anmerkung
  • Wenn Sie die Operation als update oder angebendelete, müssen Sie entweder eindeutige Schlüssel für die Zieltabelle angeben, wenn Sie Ihren Firehose-Stream einrichten, oder identifier-field-idsin Iceberg festlegen, wenn Sie Operationen zum Erstellen von Tabellen oder Ändern von Tabellen in Iceberg ausführen. Wenn Sie dies nicht angeben, gibt Firehose einen Fehler aus und übermittelt Daten an einen S3-Fehler-Bucket.

  • Die Table Name Werte Database Name und müssen exakt mit Ihren Zieldatenbank- und Tabellennamen übereinstimmen. Wenn sie nicht übereinstimmen, gibt Firehose einen Fehler aus und übermittelt Daten an einen S3-Fehler-Bucket.

  • Wenn Ihr Firehose-Stream sowohl eine Lambda-Transformationsfunktion als auch einen JSONQuery Ausdruck enthält, sucht Firehose zunächst nach dem Metadatenfeld in der Lambda-Ausgabe, um festzustellen, wie der Datensatz an die entsprechende Zieltabelle weitergeleitet werden soll, und sucht dann in der Ausgabe Ihres JSONQuery Ausdrucks nach fehlenden Feldern.

    Wenn das Lambda oder der JSONQuery Ausdruck nicht die erforderlichen Routing-Informationen bereitstellt, geht Firehose davon aus, dass es sich um ein Einzeltabellenszenario handelt, und sucht in der Konfiguration der eindeutigen Schlüssel nach Einzeltabelleninformationen.

    Weitere Informationen finden Sie unter Eingehende Datensätze an eine einzelne Iceberg-Tabelle weiterleiten. Wenn Firehose die Routing-Informationen nicht ermitteln und den Datensatz nicht mit einer bestimmten Zieltabelle abgleichen kann, werden die Daten an Ihren angegebenen S3-Fehler-Bucket gesendet.

Beispiel-Lambda-Funktion

Diese Lambda-Funktion ist ein Python-Beispielcode, der die eingehenden Stream-Datensätze analysiert und erforderliche Felder hinzufügt, um anzugeben, wie die Daten in bestimmte Tabellen geschrieben werden sollen. Sie können diesen Beispielcode verwenden, um den Metadatenbereich für Routing-Informationen hinzuzufügen.

import json import base64 def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']) firehose_records_output = {} firehose_records_output['records'] = [] for firehose_record_input in firehose_records_input['records']: # Get payload from Lambda input, it could be different with different sources if 'kafkaRecordValue' in firehose_record_input: payload_bytes = base64.b64decode(firehose_record_input['kafkaRecordValue']).decode('utf-8') else payload_bytes = base64.b64decode(firehose_record_input['data']).decode('utf-8') # perform data processing on customer payload bytes here # Create output with proper record ID, output data (may be different with different sources), result, and metadata firehose_record_output = {} if 'kafkaRecordValue' in firehose_record_input: firehose_record_output['kafkaRecordValue'] = base64.b64encode(payload_bytes.encode('utf-8')) else firehose_record_output['data'] = base64.b64encode(payload_bytes.encode('utf-8')) firehose_record_output['recordId'] = firehose_record_input['recordId'] firehose_record_output['result'] = 'Ok' firehose_record_output['metadata'] = { 'otfMetadata': { 'destinationDatabaseName': 'your_destination_database', 'destinationTableName': 'your_destination_table', 'operation': 'insert' } } firehose_records_output['records'].append(firehose_record_output) return firehose_records_output