Enrutamiento de los registros entrantes a diferentes tablas de Iceberg - Amazon Data Firehose

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Enrutamiento de los registros entrantes a diferentes tablas de Iceberg

Amazon Data Firehose puede enrutar los registros entrantes de un flujo a diferentes tablas de Iceberg en función del contenido del registro. Los registros no se mantienen en orden cuando se entregan desde Amazon Data Firehose. Observe el siguiente registro de entrada de ejemplo.

{ "deviceId": "Device1234", "timestamp": "2024-11-28T11:30:00Z", "data": { "temperature": 21.5, "location": { "latitude": 37.3324, "longitude": -122.0311 } }, "powerlevel": 84, "status": "online" }
{ "deviceId": "Device4567", "timestamp": "2023-11-28T10:40:00Z", "data": { "pressure": 1012.4, "location": { "zipcode": 24567 } }, "powerlevel": 82, "status": "online" }

En este ejemplo, el campo deviceId tiene dos valores posibles: Device1234 y Device4567. Cuando un registro entrante tiene un campo deviceId como Device1234, vamos a escribir el registro en una tabla de Iceberg denominada Device1234, y cuando un registro entrante tiene un campo deviceId como Device4567, vamos a escribir el registro en una tabla denominada Device4567.

Tenga en cuenta que los registros con Device1234 y Device4567 pueden tener un conjunto diferente de campos que se asignan a diferentes columnas de la tabla de Iceberg correspondiente. Los registros entrantes pueden tener una estructura JSON anidada que permita que el deviceId esté anidado dentro del registro JSON. En las próximas secciones, analizaremos cómo se pueden enrutar los registros a diferentes tablas proporcionando la información de enrutamiento adecuada a Firehose en estas situaciones.

Proporcione información de enrutamiento a Firehose con expresión JSONQuery

La forma más sencilla y rentable de proporcionar información de enrutamiento de registros a Firehose es proporcionar una JSONQuery expresión. Con este enfoque, se proporcionan JSONQuery expresiones para tres parámetros: Database NameTable Name, y (opcionalmente). Operation Firehose usa la expresión que usted proporciona para extraer información de los registros de los flujos entrantes para enrutarlos.

El parámetro Database Name especifica el nombre de la base de datos de destino. El parámetro Table Name especifica el nombre de la tabla de destino. Operation es un parámetro opcional que indica si se debe insertar el registro del flujo entrante como un registro nuevo en la tabla de destino o si se debe modificar o eliminar un registro existente en la tabla de destino. El campo Operación debe tener uno de los siguientes valores: insert, update o delete.

Para cada uno de estos tres parámetros, puede proporcionar un valor estático o una expresión dinámica en la que el valor se recupere del registro entrante. Por ejemplo, si desea entregar todos los registros de flujos entrantes a una única base de datos denominada IoTevents, el Nombre de la base de datos tendría un valor estático de “IoTevents”. Si el nombre de la tabla de destino debe obtenerse de un campo del registro entrante, el nombre de la tabla es una expresión dinámica que especifica el campo del registro entrante del que se debe recuperar el nombre de la tabla de destino.

En el siguiente ejemplo, utilizamos un valor estático para el Nombre de la base de datos, un valor dinámico para el Nombre de la tabla y un valor estático para la operación. Tenga en cuenta que especificar la operación es opcional. Si no se especifica ninguna operación, Firehose inserta los registros entrantes en la tabla de destino como registros nuevos de forma predeterminada.

Database Name : "IoTevents" Table Name : .deviceId Operation : "insert"

Si el campo deviceId está anidado dentro del registro JSON, hay que especificar el nombre de la tabla con la información del campo anidado como .event.deviceId.

nota
  • Si especificas la operación como update odelete, debes especificar claves únicas para la tabla de destino cuando configuras tu transmisión Firehose, o establecerla identifier-field-idsen Iceberg cuando ejecutas las operaciones de creación de tabla o modificación de tabla en Iceberg. Si no se especifica, Firehose generará un error y enviará los datos a un bucket de errores de S3.

  • Los valores de Database Name y Table Name deben coincidir exactamente con los nombres de la base de datos y la tabla de destino. Si no coinciden, Firehose arroja un error y envía los datos a un bucket de errores de S3.

Suministro de información de enrutamiento mediante una función de AWS Lambda

Puede haber situaciones en las que tenga reglas complejas que determinen cómo enrutar los registros entrantes a una tabla de destino. Por ejemplo, puede tener una regla que defina si un campo contiene el valor A, B o F, que deba enrutarse a una tabla de destino denominada TableX o puede que desee aumentar el registro de flujos entrantes añadiendo atributos adicionales. Por ejemplo, si un registro contiene un campo device_id como 1, puede añadir otro campo device_type como “módem” y escribir el campo adicional en la columna de la tabla de destino. En esos casos, puede transformar el flujo de origen mediante una AWS Lambda función de Firehose y proporcionar información de enrutamiento como parte del resultado de la función de transformación Lambda. Para saber cómo puede transformar la transmisión de origen mediante una AWS Lambda función de Firehose, consulte Transformar los datos de origen en Amazon Data Firehose.

Cuando se utiliza Lambda para la transformación del flujo de origen en Firehose, la salida debe contener los parámetros recordId, result y data o KafkaRecordValue. El parámetro recordId contiene el registro del flujo de entrada, result indica si la transformación se ha realizado correctamente, y data contiene la salida transformada codificada en Base64 de la función de Lambda. Para obtener más información, consulte Parámetros necesarios para la transformación de datos.

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1" }

Para especificar la información de enrutamiento a Firehose sobre cómo enrutar el registro del flujo a una tabla de destino como parte de la función de Lambda, el resultado de la función de Lambda debe contener una sección adicional para metadata. En el siguiente ejemplo, se muestra cómo se agrega la sección de metadatos al resultado de Lambda para un flujo de Firehose que usa Kinesis Data Streams como origen de datos para indicar a Firehose que debe insertar el registro como un registro nuevo en la tabla denominada Device1234 de la base de datos IoTevents.

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1", "metadata":{ "otfMetadata":{ "destinationTableName":"Device1234", "destinationDatabaseName":"IoTevents", "operation":"insert" } } }

Del mismo modo, en el siguiente ejemplo se muestra cómo se puede añadir la sección de metadatos al resultado de Lambda para un Firehose que utilice Amazon Managed Streaming para Apache Kafka como origen de datos para indicarle a Firehose que debe insertar el registro como un registro nuevo en la tabla denominada Device1234 en la base de datos IoTevents.

{ "recordId": "49655962066601463032522589543535113056108699331451682818000000", "result": "Ok", "kafkaRecordValue": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1", "metadata":{ "otfMetadata":{ "destinationTableName":"Device1234", "destinationDatabaseName":"IoTevents", "operation":"insert" } } }

En este ejemplo,

  • destinationDatabaseName hace referencia al nombre de la base de datos de destino y es un campo obligatorio.

  • destinationTableName hace referencia al nombre de la tabla de destino y es un campo obligatorio.

  • operation es un campo opcional con los siguientes valores posibles: insert, update y delete. Si no especifica un valor, la operación predeterminada es insert.

nota
  • Si especificas la operación como update odelete, debes especificar claves únicas para la tabla de destino cuando configuras tu transmisión Firehose, o establecerla identifier-field-idsen Iceberg cuando ejecutas las operaciones de creación de tabla o modificación de tabla en Iceberg. Si no se especifica, Firehose generará un error y enviará los datos a un bucket de errores de S3.

  • Los valores de Database Name y Table Name deben coincidir exactamente con los nombres de la base de datos y la tabla de destino. Si no coinciden, Firehose arroja un error y envía los datos a un bucket de errores de S3.

  • Cuando la transmisión de Firehose tiene una función de transformación Lambda y una expresión JSONQuery , Firehose comprueba primero el campo de metadatos en la salida de Lambda para determinar cómo enrutar el registro a la tabla de destino adecuada y, a continuación, examina el resultado de la expresión para ver si faltan campos. JSONQuery

    Si la Lambda o la JSONQuery expresión no proporcionan la información de enrutamiento requerida, Firehose asume que se trata de un escenario de una sola tabla y busca la información de una sola tabla en la configuración de claves únicas.

    Para obtener más información, consulte la tabla Enrutar los registros entrantes a una única tabla de Iceberg. Si Firehose no determina la información de enrutamiento ni hace coincidir el registro con una tabla de destino específica, enviará los datos al bucket de errores de S3 especificado.

Función de Lambda de ejemplo

Esta función de Lambda es un ejemplo de código de Python que analiza los registros del flujo entrante y agrega los campos obligatorios para especificar cómo se deben escribir los datos en tablas específicas. Puede usar este código de ejemplo para agregar la sección de metadatos para la información de enrutamiento.

import json import base64 def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn']) firehose_records_output = {} firehose_records_output['records'] = [] for firehose_record_input in firehose_records_input['records']: # Get payload from Lambda input, it could be different with different sources if 'kafkaRecordValue' in firehose_record_input: payload_bytes = base64.b64decode(firehose_record_input['kafkaRecordValue']).decode('utf-8') else payload_bytes = base64.b64decode(firehose_record_input['data']).decode('utf-8') # perform data processing on customer payload bytes here # Create output with proper record ID, output data (may be different with different sources), result, and metadata firehose_record_output = {} if 'kafkaRecordValue' in firehose_record_input: firehose_record_output['kafkaRecordValue'] = base64.b64encode(payload_bytes.encode('utf-8')) else firehose_record_output['data'] = base64.b64encode(payload_bytes.encode('utf-8')) firehose_record_output['recordId'] = firehose_record_input['recordId'] firehose_record_output['result'] = 'Ok' firehose_record_output['metadata'] = { 'otfMetadata': { 'destinationDatabaseName': 'your_destination_database', 'destinationTableName': 'your_destination_table', 'operation': 'insert' } } firehose_records_output['records'].append(firehose_record_output) return firehose_records_output