Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Indirizza i record in entrata a diverse tabelle Iceberg
Amazon Data Firehose può indirizzare i record in entrata in un flusso verso diverse tabelle Iceberg in base al contenuto del record. I registri non vengono mantenuti in ordine quando vengono consegnati da Amazon Data Firehose. Considera il seguente record di input di esempio.
{ "deviceId": "Device1234", "timestamp": "2024-11-28T11:30:00Z", "data": { "temperature": 21.5, "location": { "latitude": 37.3324, "longitude": -122.0311 } }, "powerlevel": 84, "status": "online" }
{ "deviceId": "Device4567", "timestamp": "2023-11-28T10:40:00Z", "data": { "pressure": 1012.4, "location": { "zipcode": 24567 } }, "powerlevel": 82, "status": "online" }
In questo esempio, il deviceId campo ha due valori possibili: Device1234 eDevice4567. Quando un record in entrata ha il deviceId campo asDevice1234, vogliamo scrivere il record in una tabella Iceberg denominataDevice1234, e quando un record in entrata ha il deviceId campo asDevice4567, vogliamo scrivere il record in una tabella denominata. Device4567
Nota che i record con Device1234 e Device4567 potrebbero avere un set diverso di campi mappati a colonne diverse nella tabella Iceberg corrispondente. I record in entrata potrebbero avere una struttura JSON annidata in cui deviceIdpossono essere annidati all'interno del record JSON. Nelle prossime sezioni, discuteremo di come indirizzare i record a tabelle diverse fornendo le informazioni di routing appropriate a Firehose in tali scenari.
Fornire informazioni di routing a JSONQuery Firehose con espressione
Il modo più semplice ed economico per fornire informazioni sul routing dei record a Firehose è fornire JSONQuery un'espressione. Con questo approccio, si forniscono JSONQuery espressioni per tre parametri: Database NameTable Name, e (facoltativamente). Operation Firehose utilizza l'espressione fornita dall'utente per estrarre informazioni dai record dei flussi in entrata per instradare i record.
Il Database Name parametro specifica il nome del database di destinazione. Il Table Name parametro specifica il nome della tabella di destinazione. Operationè un parametro opzionale che indica se inserire il record dello stream in entrata come nuovo record nella tabella di destinazione o modificare o eliminare un record esistente nella tabella di destinazione. Il campo Operazione deve avere uno dei seguenti valori: insertupdate, odelete.
Per ognuno di questi tre parametri, è possibile fornire un valore statico o un'espressione dinamica in cui il valore viene recuperato dal record in entrata. Ad esempio, se si desidera inviare tutti i record di flusso in entrata a un unico database denominatoIoTevents, il nome del database avrà un valore statico di. “IoTevents” Se il nome della tabella di destinazione deve essere ottenuto da un campo del record in entrata, il nome della tabella è un'espressione dinamica che specifica il campo nel record in entrata da cui deve essere recuperato il nome della tabella di destinazione.
Nell'esempio seguente, utilizziamo un valore statico per Database Name, un valore dinamico per Table Name e un valore statico per l'operazione. Si noti che la specificazione dell'operazione è facoltativa. Se non viene specificata alcuna operazione, Firehose inserisce i record in entrata nella tabella di destinazione come nuovi record per impostazione predefinita.
Database Name : "IoTevents" Table Name : .deviceId Operation : "insert"
Se il deviceId campo è nidificato all'interno del record JSON, specifichiamo Table Name con le informazioni sul campo nidificato come. .event.deviceId
Nota
-
Quando si specifica l'operazione come
updateodelete, è necessario specificare chiavi univoche per la tabella di destinazione quando si imposta il flusso Firehose o impostarle identifier-field-idsin Iceberg quando si eseguono le operazioni di creazione tabella o modifica tabella in Iceberg. Se non lo specifichi, Firehose genera un errore e invia i dati a un bucket di errore S3. -
I
Table NamevaloriDatabase Nameand devono corrispondere esattamente ai nomi delle tabelle e del database di destinazione. Se non corrispondono, Firehose genera un errore e invia i dati a un bucket di errore S3.
Fornisci informazioni di routing utilizzando una funzione AWS Lambda
Potrebbero esserci scenari in cui sono presenti regole complesse che determinano come indirizzare i record in entrata a una tabella di destinazione. Ad esempio, potresti avere una regola che definisce se un campo contiene il valore A, B o F, che deve essere indirizzato a una tabella di destinazione denominata TableX oppure potresti voler aumentare il record dello stream in entrata aggiungendo attributi aggiuntivi. Ad esempio, se un record contiene un campo device_id come 1, potresti voler aggiungere un altro campo device_type come «modem «e scrivere il campo aggiuntivo nella colonna della tabella di destinazione. In questi casi, è possibile trasformare il flusso di origine utilizzando una AWS Lambda funzione in Firehose e fornire informazioni di routing come parte dell'output della funzione di trasformazione Lambda. Per capire come trasformare il flusso di origine utilizzando una AWS Lambda funzione in Firehose, consulta Trasformare i dati di origine in Amazon Data Firehose.
Quando si utilizza Lambda per la trasformazione di un flusso di origine in Firehose, l'output deve contenere e o recordId parametriresult. data KafkaRecordValue Il parametro recordId contiene il record del flusso di input, result indica se la trasformazione ha avuto successo e data contiene l'output trasformato con codifica Base64 della funzione Lambda. Per ulteriori informazioni, consulta Parametri richiesti per la trasformazione dei dati.
{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1" }
Per specificare a Firehose informazioni di routing su come indirizzare il record dello stream a una tabella di destinazione come parte della funzione Lambda, l'output della funzione Lambda deve contenere una sezione aggiuntiva per. metadata L'esempio seguente mostra come la sezione dei metadati viene aggiunta all'output Lambda per un flusso Firehose che utilizza Kinesis Data Streams come origine dati per indicare a Firehose che deve inserire il record come nuovo record nella tabella denominata del database. Device1234 IoTevents
{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1", "metadata":{ "otfMetadata":{ "destinationTableName":"Device1234", "destinationDatabaseName":"IoTevents", "operation":"insert" } } }
Analogamente, l'esempio seguente mostra come aggiungere la sezione dei metadati all'output Lambda per un Firehose che utilizza Amazon Managed Streaming for Apache Kafka come origine dati per indicare a Firehose che deve inserire il record come nuovo record in una tabella denominata nel database. Device1234 IoTevents
{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "kafkaRecordValue": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1", "metadata":{ "otfMetadata":{ "destinationTableName":"Device1234", "destinationDatabaseName":"IoTevents", "operation":"insert" } } }
Per questo esempio,
-
destinationDatabaseNamesi riferisce al nome del database di destinazione ed è un campo obbligatorio. -
destinationTableNamesi riferisce al nome della tabella di destinazione ed è un campo obbligatorio. -
operationè un campo opzionale con valori possibili comeinsertupdate, edelete. Se non si specifica alcun valore, l'operazione predefinita èinsert.
Nota
-
Quando si specifica l'operazione come
updateodelete, è necessario specificare chiavi univoche per la tabella di destinazione quando si imposta il flusso Firehose o impostarle identifier-field-idsin Iceberg quando si eseguono le operazioni di creazione tabella o modifica tabella in Iceberg. Se non lo specifichi, Firehose genera un errore e invia i dati a un bucket di errore S3. -
I
Table NamevaloriDatabase Nameand devono corrispondere esattamente ai nomi delle tabelle e del database di destinazione. Se non corrispondono, Firehose genera un errore e invia i dati a un bucket di errore S3. -
Quando il flusso Firehose ha sia una funzione di trasformazione Lambda che un'espressione JSONQuery , Firehose verifica innanzitutto il campo dei metadati nell'output Lambda per determinare come indirizzare il record alla tabella di destinazione appropriata, quindi esamina l'output dell'espressione per individuare i campi mancanti. JSONQuery
Se la Lambda o JSONQuery l'espressione non forniscono le informazioni di routing richieste, Firehose lo presuppone come uno scenario a tabella singola e cerca le informazioni su una singola tabella nella configurazione delle chiavi uniche.
Per ulteriori informazioni, consulta la tabella Instradare i record in entrata verso una singola tabella Iceberg. Se Firehose non riesce a determinare le informazioni di routing e ad abbinare il record a una tabella di destinazione specificata, invia i dati al bucket di errore S3 specificato.
Funzione Lambda di esempio
Questa funzione Lambda è un esempio di codice Python che analizza i record dei flussi in entrata e aggiunge i campi obbligatori per specificare come i dati devono essere scritti in tabelle specifiche. È possibile utilizzare questo codice di esempio per aggiungere la sezione dei metadati per le informazioni di routing.
import json import base64 def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']) firehose_records_output = {} firehose_records_output['records'] = [] for firehose_record_input in firehose_records_input['records']: # Get payload from Lambda input, it could be different with different sources if 'kafkaRecordValue' in firehose_record_input: payload_bytes = base64.b64decode(firehose_record_input['kafkaRecordValue']).decode('utf-8') else payload_bytes = base64.b64decode(firehose_record_input['data']).decode('utf-8') # perform data processing on customer payload bytes here # Create output with proper record ID, output data (may be different with different sources), result, and metadata firehose_record_output = {} if 'kafkaRecordValue' in firehose_record_input: firehose_record_output['kafkaRecordValue'] = base64.b64encode(payload_bytes.encode('utf-8')) else firehose_record_output['data'] = base64.b64encode(payload_bytes.encode('utf-8')) firehose_record_output['recordId'] = firehose_record_input['recordId'] firehose_record_output['result'] = 'Ok' firehose_record_output['metadata'] = { 'otfMetadata': { 'destinationDatabaseName': 'your_destination_database', 'destinationTableName': 'your_destination_table', 'operation': 'insert' } } firehose_records_output['records'].append(firehose_record_output) return firehose_records_output