

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Leiten Sie eingehende Datensätze an verschiedene Iceberg-Tabellen weiter
<a name="apache-iceberg-format-input-record-different"></a>

Amazon Data Firehose kann eingehende Datensätze in einem Stream basierend auf dem Inhalt des Datensatzes an verschiedene Iceberg-Tabellen weiterleiten. Die Aufzeichnungen werden nicht in der richtigen Reihenfolge aufbewahrt, wenn sie von Amazon Data Firehose geliefert werden. Betrachten Sie den folgenden Beispiel-Eingabedatensatz. 

```
{
  "deviceId": "Device1234",
  "timestamp": "2024-11-28T11:30:00Z",
  "data": {
    "temperature": 21.5,
    "location": {
      "latitude": 37.3324,
      "longitude": -122.0311
    }
  },
  "powerlevel": 84,
  "status": "online"
}
```

```
{
  "deviceId": "Device4567",
  "timestamp": "2023-11-28T10:40:00Z",
  "data": {
    "pressure": 1012.4,
    "location": {
      "zipcode": 24567
    }
  },
  "powerlevel": 82,
  "status": "online"
}
```

In diesem Beispiel hat das `deviceId` Feld zwei mögliche Werte — `Device1234` und`Device4567`. Wenn ein eingehender Datensatz das `deviceId` Feld als enthält`Device1234`, möchten wir den Datensatz in eine Eisberg-Tabelle mit dem Namen schreiben`Device1234`, und wenn ein eingehender Datensatz das `deviceId` Feld als enthält`Device4567`, möchten wir den Datensatz in eine Tabelle mit dem Namen `Device4567` schreiben. 

Beachten Sie, dass die Datensätze unterschiedliche Felder enthalten `Device1234` und `Device4567` möglicherweise unterschiedliche Felder haben, die unterschiedlichen Spalten in der entsprechenden Eisberg-Tabelle zugeordnet sind. Die eingehenden Datensätze haben möglicherweise eine verschachtelte JSON-Struktur, in der sie innerhalb des JSON-Datensatzes verschachtelt werden **`deviceId`**können. In den nächsten Abschnitten besprechen wir, wie Sie Datensätze an verschiedene Tabellen weiterleiten können, indem Sie Firehose in solchen Szenarien die entsprechenden Routing-Informationen zur Verfügung stellen. 

## Stellen Sie Firehose Routing-Informationen mit JSONQuery Ausdruck zur Verfügung
<a name="apache-iceberg-route-jq"></a>

Die einfachste und kostengünstigste Möglichkeit, Firehose Datensatz-Routing-Informationen zur Verfügung zu stellen, ist die Bereitstellung eines JSONQuery Ausdrucks. Bei diesem Ansatz stellen Sie JSONQuery Ausdrücke für drei Parameter bereit — `Database Name``Table Name`, und (optional)`Operation`. Firehose verwendet den von Ihnen angegebenen Ausdruck, um Informationen aus Ihren eingehenden Stream-Datensätzen zu extrahieren, um die Datensätze weiterzuleiten. 

Der `Database Name` Parameter gibt den Namen der Zieldatenbank an. Der `Table Name` Parameter gibt den Namen der Zieltabelle an. `Operation`ist ein optionaler Parameter, der angibt, ob der eingehende Stream-Datensatz als neuer Datensatz in die Zieltabelle eingefügt oder ein vorhandener Datensatz in der Zieltabelle geändert oder gelöscht werden soll. Das Feld Operation muss einen der folgenden Werte haben — `insert``update`, oder`delete`. 

Für jeden dieser drei Parameter können Sie entweder einen statischen Wert oder einen dynamischen Ausdruck angeben, wobei der Wert aus dem eingehenden Datensatz abgerufen wird. Wenn Sie beispielsweise alle eingehenden Stream-Datensätze an eine einzige Datenbank mit dem Namen senden möchten`IoTevents`, hätte der Datenbankname den statischen Wert`“IoTevents”`. Wenn der Name der Zieltabelle aus einem Feld im eingehenden Datensatz abgerufen werden muss, ist der Tabellenname ein dynamischer Ausdruck, der das Feld im eingehenden Datensatz angibt, aus dem der Name der Zieltabelle abgerufen werden muss. 

Im folgenden Beispiel verwenden wir einen statischen Wert für den Datenbanknamen, einen dynamischen Wert für den Tabellennamen und einen statischen Wert für den Vorgang. Beachten Sie, dass die Angabe der Operation optional ist. Wenn keine Operation angegeben ist, fügt Firehose die eingehenden Datensätze standardmäßig als neue Datensätze in die Zieltabelle ein. 

```
Database Name : "IoTevents"
Table Name : .deviceId
Operation : "insert"
```

Wenn das `deviceId` Feld innerhalb des JSON-Datensatzes verschachtelt ist, geben wir den Tabellennamen mit den verschachtelten Feldinformationen als an. `.event.deviceId` 

**Anmerkung**  
Wenn Sie die Operation als `update` oder angeben`delete`, müssen Sie entweder eindeutige Schlüssel für die Zieltabelle angeben, wenn Sie Ihren Firehose-Stream einrichten, oder [identifier-field-ids](https://iceberg.apache.org/spec/#identifier-field-ids)in Iceberg festlegen, wenn Sie Operationen zum [Erstellen von Tabellen oder [Ändern](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#alter-table-set-identifier-fields) von Tabellen](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#create-table) in Iceberg ausführen. Wenn Sie dies nicht angeben, gibt Firehose einen Fehler aus und übermittelt Daten an einen S3-Fehler-Bucket.
Die `Table Name` Werte `Database Name` und müssen exakt mit Ihren Zieldatenbank- und Tabellennamen übereinstimmen. Wenn sie nicht übereinstimmen, gibt Firehose einen Fehler aus und übermittelt Daten an einen S3-Fehler-Bucket.

## Stellen Sie Routing-Informationen mithilfe einer Funktion bereit AWS Lambda
<a name="apache-iceberg-route-lambda"></a>

Es kann Szenarien geben, in denen Sie über komplexe Regeln verfügen, die bestimmen, wie eingehende Datensätze an eine Zieltabelle weitergeleitet werden. Möglicherweise haben Sie eine Regel, die definiert, ob ein Feld den Wert A, B oder F enthält, der an eine Zieltabelle mit dem Namen weitergeleitet werden soll, `TableX` oder Sie möchten den Datensatz für den Eingangsstream erweitern, indem Sie zusätzliche Attribute hinzufügen. Wenn ein Datensatz beispielsweise ein Feld `device_id` als 1 enthält, möchten Sie vielleicht ein weiteres Feld mit dem Namen „Modem“ hinzufügen und das zusätzliche Feld in die Spalte der Zieltabelle schreiben. `device_type` In solchen Fällen können Sie den Quellstream mithilfe einer AWS Lambda Funktion in Firehose transformieren und Routing-Informationen als Teil der Ausgabe der Lambda-Transformationsfunktion bereitstellen. Informationen dazu, wie Sie den Quellstream mithilfe einer AWS Lambda Funktion in Firehose transformieren können, finden Sie unter [Transformieren von Quelldaten in Amazon Data Firehose](data-transformation.md).

Wenn Sie Lambda für die Transformation eines Quellstreams in Firehose verwenden, muss die Ausgabe die Parameter `recordId``result`, und `data` oder `KafkaRecordValue` enthalten. Der Parameter `recordId` enthält den Eingabestream-Datensatz, `result` gibt an, ob die Transformation erfolgreich war, und `data` enthält die Base64-kodierte transformierte Ausgabe Ihrer Lambda-Funktion. Weitere Informationen finden Sie unter [Erforderliche Parameter für die Datentransformation](data-transformation-status-model.md).

```
{
  "recordId": "49655962066601463032522589543535113056108699331451682818000000",
  "result": "Ok",
  "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1"
}
```

Um Firehose Routing-Informationen darüber zu geben, wie der Stream-Datensatz als Teil Ihrer Lambda-Funktion an eine Zieltabelle weitergeleitet werden soll, muss die Ausgabe Ihrer Lambda-Funktion einen zusätzlichen Abschnitt für enthalten. `metadata` Das folgende Beispiel zeigt, wie der Metadatenabschnitt zur Lambda-Ausgabe für einen Firehose-Stream hinzugefügt wird, der Kinesis Data Streams als Datenquelle verwendet, um Firehose anzuweisen, den Datensatz als neuen Datensatz in die Tabelle mit dem Namen der Datenbank einzufügen. `Device1234` `IoTevents`

```
 {
"recordId": "49655962066601463032522589543535113056108699331451682818000000",
    "result": "Ok",
    "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1",
    
    "metadata":{
"otfMetadata":{
            "destinationTableName":"Device1234",
            "destinationDatabaseName":"IoTevents",
            "operation":"insert"
        }
    }
  }
```

In ähnlicher Weise zeigt das folgende Beispiel, wie Sie den Metadatenabschnitt zur Lambda-Ausgabe für eine Firehose hinzufügen können, die Amazon Managed Streaming for Apache Kafka als Datenquelle verwendet, um Firehose anzuweisen, den Datensatz als neuen Datensatz in eine in der Datenbank benannte Tabelle einzufügen. `Device1234` `IoTevents`

```
{
"recordId": "49655962066601463032522589543535113056108699331451682818000000",
    "result": "Ok",
    "kafkaRecordValue": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1",
    
    "metadata":{
"otfMetadata":{
            "destinationTableName":"Device1234",
            "destinationDatabaseName":"IoTevents",
            "operation":"insert"
        }
    }
  }
```

Für dieses Beispiel 
+ `destinationDatabaseName`bezieht sich auf den Namen der Zieldatenbank und ist ein Pflichtfeld.
+ `destinationTableName`bezieht sich auf den Namen der Zieltabelle und ist ein Pflichtfeld.
+ `operation`ist ein optionales Feld mit möglichen Werten wie `insert``update`, und`delete`. Wenn Sie keine Werte angeben, ist die Standardoperation`insert`.

**Anmerkung**  
Wenn Sie die Operation als `update` oder angeben`delete`, müssen Sie entweder eindeutige Schlüssel für die Zieltabelle angeben, wenn Sie Ihren Firehose-Stream einrichten, oder [identifier-field-ids](https://iceberg.apache.org/spec/#identifier-field-ids)in Iceberg festlegen, wenn Sie Operationen zum [Erstellen von Tabellen oder [Ändern](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#alter-table-set-identifier-fields) von Tabellen](https://iceberg.apache.org/docs/1.5.1/spark-ddl/#create-table) in Iceberg ausführen. Wenn Sie dies nicht angeben, gibt Firehose einen Fehler aus und übermittelt Daten an einen S3-Fehler-Bucket.
Die `Table Name` Werte `Database Name` und müssen exakt mit Ihren Zieldatenbank- und Tabellennamen übereinstimmen. Wenn sie nicht übereinstimmen, gibt Firehose einen Fehler aus und übermittelt Daten an einen S3-Fehler-Bucket.
Wenn Ihr Firehose-Stream sowohl eine Lambda-Transformationsfunktion als auch einen JSONQuery Ausdruck enthält, sucht Firehose zunächst nach dem Metadatenfeld in der Lambda-Ausgabe, um festzustellen, wie der Datensatz an die entsprechende Zieltabelle weitergeleitet werden soll, und sucht dann in der Ausgabe Ihres JSONQuery Ausdrucks nach fehlenden Feldern.   
Wenn das Lambda oder der JSONQuery Ausdruck nicht die erforderlichen Routing-Informationen bereitstellt, geht Firehose davon aus, dass es sich um ein Einzeltabellenszenario handelt, und sucht in der Konfiguration der eindeutigen Schlüssel nach Einzeltabelleninformationen.   
Weitere Informationen finden Sie unter [Eingehende Datensätze an eine einzelne Iceberg-Tabelle weiterleiten](apache-iceberg-format-input-record.md). Wenn Firehose die Routing-Informationen nicht ermitteln und den Datensatz nicht mit einer bestimmten Zieltabelle abgleichen kann, werden die Daten an Ihren angegebenen S3-Fehler-Bucket gesendet. 

### Beispiel-Lambda-Funktion
<a name="format-record-lambda"></a>

Diese Lambda-Funktion ist ein Python-Beispielcode, der die eingehenden Stream-Datensätze analysiert und erforderliche Felder hinzufügt, um anzugeben, wie die Daten in bestimmte Tabellen geschrieben werden sollen. Sie können diesen Beispielcode verwenden, um den Metadatenbereich für Routing-Informationen hinzuzufügen.

```
import json
import base64
 
 
def lambda_handler(firehose_records_input, context):
    print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'])
 
    firehose_records_output = {}
    firehose_records_output['records'] = []
 
 
    for firehose_record_input in firehose_records_input['records']:
 
        # Get payload from Lambda input, it could be different with different sources
        if 'kafkaRecordValue' in firehose_record_input:
            payload_bytes = base64.b64decode(firehose_record_input['kafkaRecordValue']).decode('utf-8')
        else
            payload_bytes = base64.b64decode(firehose_record_input['data']).decode('utf-8')
 
        # perform data processing on customer payload bytes here
 
        # Create output with proper record ID, output data (may be different with different sources), result, and metadata
        firehose_record_output = {}
 
        if 'kafkaRecordValue' in firehose_record_input:
            firehose_record_output['kafkaRecordValue'] = base64.b64encode(payload_bytes.encode('utf-8'))
        else
            firehose_record_output['data'] = base64.b64encode(payload_bytes.encode('utf-8'))
        
        firehose_record_output['recordId'] = firehose_record_input['recordId']
        firehose_record_output['result'] =  'Ok'
        firehose_record_output['metadata'] = {
            'otfMetadata': {
                'destinationDatabaseName': 'your_destination_database',
                'destinationTableName': 'your_destination_table',
                'operation': 'insert'
                }
            }
        firehose_records_output['records'].append(firehose_record_output)
    return firehose_records_output
```