Usare un argomento di Kafka come destinazione in caso di errore - AWS Lambda

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Usare un argomento di Kafka come destinazione in caso di errore

È possibile configurare un argomento di Kafka come destinazione in caso di errore per le mappature delle sorgenti degli eventi Kafka. Quando Lambda non è in grado di elaborare i record dopo estenuanti tentativi o quando i record superano l'età massima, Lambda invia i record non riusciti all'argomento Kafka specificato per un'elaborazione successiva.

Come funziona una destinazione Kafka in caso di errore

Quando si configura un argomento di Kafka come destinazione in caso di errore, Lambda funge da produttore Kafka e scrive i record non riusciti nell'argomento di destinazione. Questo crea un pattern DLT (Dead Letter Topic) all'interno dell'infrastruttura Kafka.

  • Stesso requisito del cluster: l'argomento di destinazione deve esistere nello stesso cluster Kafka degli argomenti di origine.

  • Contenuto effettivo dei record: le destinazioni Kafka ricevono i record effettivamente non riusciti insieme ai metadati di errore.

  • Prevenzione della ricorsione: Lambda previene i loop infiniti bloccando le configurazioni in cui gli argomenti di origine e destinazione coincidono.

Configurazione di una destinazione Kafka in caso di errore

È possibile configurare un argomento di Kafka come destinazione in caso di errore durante la creazione o l'aggiornamento di una mappatura dell'origine degli eventi Kafka.

Configurazione di una destinazione Kafka (console)

Configurare un argomento di Kafka come destinazione in caso di errore (console)
  1. Aprire la pagina Funzioni della console Lambda.

  2. Scegliete il nome della funzione.

  3. Esegui una delle seguenti operazioni:

    • Per aggiungere un nuovo trigger Kafka, in Panoramica delle funzioni, scegli Aggiungi trigger.

    • Per modificare un trigger Kafka esistente, scegli il trigger, quindi scegli Modifica.

  4. In Impostazioni aggiuntive, per Destinazione in caso di errore, scegli l'argomento Kafka.

  5. Per Nome argomento, inserisci il nome dell'argomento Kafka a cui desideri inviare i record non riusciti.

  6. Scegli Aggiungi o Salva.

Configurazione di una destinazione Kafka ()AWS CLI

Utilizzate il kafka:// prefisso per specificare un argomento di Kafka come destinazione in caso di errore.

Creazione di una mappatura della sorgente degli eventi con la destinazione Kafka

L'esempio seguente crea una mappatura dell'origine degli eventi Amazon MSK con un argomento Kafka come destinazione in caso di errore:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Per Kafka autogestito, usa la stessa sintassi:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Aggiornamento di una destinazione Kafka

Usa il update-event-source-mapping comando per aggiungere o modificare una destinazione Kafka:

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Formato di registrazione per una destinazione Kafka

Quando Lambda invia record non riusciti a un argomento di Kafka, ogni messaggio contiene sia metadati sull'errore che il contenuto effettivo del record.

Metadati di errore

I metadati includono informazioni sul motivo per cui il record ha avuto esito negativo e dettagli sul batch originale:

{ "requestContext": { "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST", "condition": "RetriesExhausted", "approximateInvokeCount": 3 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T18:16:05.568Z", "KafkaBatchInfo": { "batchSize": 1, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "payloadSize": 1162, "recordInfo": { "offset": "49601189658422359378836298521827638475320189012309704722", "timestamp": "2019-11-14T18:16:04.835Z" } }, "payload": { "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "records": { "my-topic-0": [ { "headers": [], "key": "dGVzdC1rZXk=", "offset": 100, "partition": 0, "timestamp": 1749116692330, "timestampType": "CREATE_TIME", "topic": "my-topic", "value": "dGVzdC12YWx1ZQ==" } ] } } }

Comportamento delle chiavi di partizione

Lambda utilizza la stessa chiave di partizione del record originale per la produzione sull'argomento di destinazione. Se il record originale non aveva alcuna chiave, Lambda utilizza il partizionamento round-robin predefinito di Kafka su tutte le partizioni disponibili nell'argomento di destinazione.

Requisiti e limitazioni

  • È richiesta la modalità di provisioning: una destinazione Kafka in caso di errore è disponibile solo per le mappature delle sorgenti degli eventi con la modalità provisioning abilitata.

  • Solo stesso cluster: l'argomento di destinazione deve esistere nello stesso cluster Kafka degli argomenti di origine.

  • Autorizzazioni per gli argomenti: la mappatura dei sorgenti degli eventi deve disporre delle autorizzazioni di scrittura sull'argomento di destinazione. Esempio:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • Nessuna ricorsione: il nome dell'argomento di destinazione non può essere lo stesso dei nomi degli argomenti di origine.